[FLINK-5133] [core] Followups for ResourceSpec on DataSet / DataStream API

  - Correct some use of Preconditions.checkNotNull
  - Make 'resources' plural in all cases
  - Add comments to why the setters are commented out
  - Add @PublicEvolving annotations
  - Make the Scala API completeness test match Scala-esk versions of Java 
Getters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9912de21
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9912de21
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9912de21

Branch: refs/heads/master
Commit: 9912de21a1053013a220707f8b3868bdbf93aaca
Parents: f37ed02
Author: Stephan Ewen <[email protected]>
Authored: Tue Feb 28 10:42:16 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Tue Feb 28 18:59:10 2017 +0100

----------------------------------------------------------------------
 .../flink/api/common/operators/Operator.java    |  41 ++++---
 .../api/common/operators/ResourceSpec.java      |  29 ++---
 .../flink/api/java/operators/DataSink.java      | 109 ++++++++++---------
 .../api/java/operators/DeltaIteration.java      | 105 +++++++++---------
 .../flink/api/java/operators/Operator.java      | 101 +++++++++--------
 .../api/java/operators/OperatorTranslation.java |  20 ++--
 .../apache/flink/optimizer/plan/PlanNode.java   |   8 +-
 .../org/apache/flink/api/scala/DataSet.scala    |  69 ++++++------
 .../streaming/api/datastream/DataStream.java    |  18 +--
 .../api/datastream/DataStreamSink.java          |  76 +++++++------
 .../datastream/SingleOutputStreamOperator.java  |  74 +++++++------
 .../flink/streaming/api/graph/StreamGraph.java  |   2 +-
 .../api/graph/StreamGraphGenerator.java         |   4 +-
 .../flink/streaming/api/graph/StreamNode.java   |  18 +--
 .../transformations/StreamTransformation.java   |  30 ++---
 .../flink/streaming/api/scala/DataStream.scala  |  58 +++++-----
 .../ScalaAPICompletenessTestBase.scala          |  36 +++---
 17 files changed, 424 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
index a9dedfa..1905555 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java
@@ -16,18 +16,20 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.operators;
 
 import java.util.List;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Visitable;
 
+import javax.annotation.Nullable;
+
 /**
 * Abstract base class for all operators. An operator is a source, sink, or it 
applies an operation to
 * one or more inputs, producing a result.
@@ -45,9 +47,11 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
                
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;  // the 
number of parallel instances to use
 
-       private ResourceSpec minResource;                       // the minimum 
resource of the contract instance.
+       @Nullable
+       private ResourceSpec minResources;          // the minimum resource of 
the contract instance.
 
-       private ResourceSpec preferredResource; // the preferred resource of 
the contract instance.
+       @Nullable
+       private ResourceSpec preferredResources;    // the preferred resource 
of the contract instance.
 
        /**
         * The return type of the user function.
@@ -190,35 +194,40 @@ public abstract class Operator<OUT> implements 
Visitable<Operator<?>> {
        }
 
        /**
-        * Gets the minimum resource for this contract instance. The minimum 
resource denotes how many
-        * resources will be needed in the minimum for the user function during 
the execution.
+        * Gets the minimum resources for this operator. The minimum resources 
denotes how many
+        * resources will be needed at least minimum for the operator or user 
function during the execution.
         *
-        * @return The minimum resource of this operator.
+        * @return The minimum resources of this operator.
         */
-       public ResourceSpec getMinResource() {
-               return this.minResource;
+       @Nullable
+       @PublicEvolving
+       public ResourceSpec getMinResources() {
+               return this.minResources;
        }
 
        /**
-        * Gets the preferred resource for this contract instance. The 
preferred resource denotes how many
+        * Gets the preferred resources for this contract instance. The 
preferred resources denote how many
         * resources will be needed in the maximum for the user function during 
the execution.
         *
         * @return The preferred resource of this operator.
         */
-       public ResourceSpec getPreferredResource() {
-               return this.preferredResource;
+       @Nullable
+       @PublicEvolving
+       public ResourceSpec getPreferredResources() {
+               return this.preferredResources;
        }
 
        /**
         * Sets the minimum and preferred resources for this contract instance. 
The resource denotes
         * how many memories and cpu cores of the user function will be 
consumed during the execution.
         *
-        * @param minResource The minimum resource of this operator.
-        * @param preferredResource The preferred resource of this operator.
+        * @param minResources The minimum resource of this operator.
+        * @param preferredResources The preferred resource of this operator.
         */
-       public void setResource(ResourceSpec minResource, ResourceSpec 
preferredResource) {
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
+       @PublicEvolving
+       public void setResource(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+               this.minResources = minResources;
+               this.preferredResources = preferredResources;
        }
        
        

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
index 1387508..0ea289a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
@@ -105,14 +105,12 @@ public class ResourceSpec implements Serializable {
         * @return The new resource with merged values.
         */
        public ResourceSpec merge(ResourceSpec other) {
-               ResourceSpec result = new ResourceSpec(
+               return new ResourceSpec(
                                Math.max(this.cpuCores, other.cpuCores),
                                this.heapMemoryInMB + other.heapMemoryInMB,
                                this.directMemoryInMB + other.directMemoryInMB,
                                this.nativeMemoryInMB + other.nativeMemoryInMB,
                                this.stateSizeInMB + other.stateSizeInMB);
-
-               return  result;
        }
 
        public double getCpuCores() {
@@ -141,12 +139,8 @@ public class ResourceSpec implements Serializable {
         * @return True if all the values are equal or greater than 0, 
otherwise false.
         */
        public boolean isValid() {
-               if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && 
this.directMemoryInMB >= 0 &&
-                               this.nativeMemoryInMB >= 0 && 
this.stateSizeInMB >= 0) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && 
this.directMemoryInMB >= 0 &&
+                               this.nativeMemoryInMB >= 0 && 
this.stateSizeInMB >= 0);
        }
 
        /**
@@ -162,11 +156,7 @@ public class ResourceSpec implements Serializable {
                int cmp3 = Integer.compare(this.directMemoryInMB, 
other.directMemoryInMB);
                int cmp4 = Integer.compare(this.nativeMemoryInMB, 
other.nativeMemoryInMB);
                int cmp5 = Integer.compare(this.stateSizeInMB, 
other.stateSizeInMB);
-               if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 
0) {
-                       return true;
-               } else {
-                       return false;
-               }
+               return (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && 
cmp5 <= 0);
        }
 
        @Override
@@ -186,6 +176,17 @@ public class ResourceSpec implements Serializable {
        }
 
        @Override
+       public int hashCode() {
+               final long cpuBits =  Double.doubleToLongBits(cpuCores);
+               int result = (int) (cpuBits ^ (cpuBits >>> 32));
+               result = 31 * result + heapMemoryInMB;
+               result = 31 * result + directMemoryInMB;
+               result = 31 * result + nativeMemoryInMB;
+               result = 31 * result + stateSizeInMB;
+               return result;
+       }
+
+       @Override
        public String toString() {
                return "ResourceSpec{" +
                                "cpuCores=" + cpuCores +

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 3be9cc0..369e013 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -52,9 +52,9 @@ public class DataSink<T> {
        
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
-       private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
        private Configuration parameters;
 
@@ -285,61 +285,70 @@ public class DataSink<T> {
        }
 
        /**
-        * Returns the minimum resource of this data sink. If no minimum 
resource has been set,
-        * it returns the default empty resource.
+        * Returns the minimum resources of this data sink. If no minimum 
resources have been set,
+        * this returns the default resource profile.
         *
-        * @return The minimum resource of this data sink.
+        * @return The minimum resources of this data sink.
         */
-       public ResourceSpec getMinResource() {
-               return this.minResource;
+       @PublicEvolving
+       public ResourceSpec getMinResources() {
+               return this.minResources;
        }
 
        /**
-        * Returns the preferred resource of this data sink. If no preferred 
resource has been set,
-        * it returns the default empty resource.
+        * Returns the preferred resources of this data sink. If no preferred 
resources have been set,
+        * this returns the default resource profile.
         *
-        * @return The preferred resource of this data sink.
+        * @return The preferred resources of this data sink.
         */
-       public ResourceSpec getPreferredResource() {
-               return this.preferredResource;
+       @PublicEvolving
+       public ResourceSpec getPreferredResources() {
+               return this.preferredResources;
        }
 
-       /**
-        * Sets the minimum and preferred resources for this data sink. This 
overrides the default empty resource.
-        *      The minimum resource must be satisfied and the preferred 
resource specifies the upper bound
-        * for dynamic resource resize.
-        *
-        * @param minResource The minimum resource for this data sink.
-        * @param preferredResource The preferred resource for this data sink.
-        * @return The data sink with set minimum and preferred resources.
-        */
-       /*
-       public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec 
preferredResource) {
-               Preconditions.checkNotNull(minResource != null && 
preferredResource != null,
-                               "The min and preferred resources must be not 
null.");
-               Preconditions.checkArgument(minResource.isValid() && 
preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-                               "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
-
-               return this;
-       }*/
-
-       /**
-        * Sets the resource for this data sink. This overrides the default 
empty minimum and preferred resources.
-        *
-        * @param resource The resource for this data sink.
-        * @return The data sink with set minimum and preferred resources.
-        */
-       /*
-       public DataSink<T> setResource(ResourceSpec resource) {
-               Preconditions.checkNotNull(resource != null, "The resource must 
be not null.");
-               Preconditions.checkArgument(resource.isValid(), "The resource 
values must be greater than 0.");
-
-               this.minResource = resource;
-               this.preferredResource = resource;
-
-               return this;
-       }*/
+//     
---------------------------------------------------------------------------
+//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
+//      The setters are hence commented out at this point.
+//     
---------------------------------------------------------------------------
+//
+//     /**
+//      * Sets the minimum and preferred resources for this data sink. This 
overrides the default empty resource.
+//      *      The minimum resource must be satisfied and the preferred 
resource specifies the upper bound
+//      * for dynamic resource resize.
+//      *
+//      * @param minResources The minimum resource for this data sink.
+//      * @param preferredResources The preferred resource for this data sink.
+//      * @return The data sink with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+//             Preconditions.checkArgument(minResources.isValid() && 
+//                             preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+//                             "The values in resource must be not less than 0 
and the preferred " +
+//                             "resource must be greater than the min 
resource.");
+//
+//             this.minResources = minResources;
+//             this.preferredResources = preferredResources;
+//
+//             return this;
+//     }
+//
+//     /**
+//      * Sets the resources for this data sink. This overrides the default 
resource profile.
+//      *
+//      * @param resources The resources for this data sink.
+//      * @return The data sink with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DataSink<T> setResources(ResourceSpec resources) {
+//             Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
+//
+//             this.minResources = resources;
+//             this.preferredResources = resources;
+//
+//             return this;
+//     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
index cf0a63e..3d327e9 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java
@@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> {
        
        private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
-       private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
        
        private boolean solutionSetUnManaged;
 
@@ -197,65 +197,72 @@ public class DeltaIteration<ST, WT> {
                return parallelism;
        }
 
-       /**
-        * Sets the minimum and preferred resources for the iteration. This 
overrides the default empty resource.
-        *      The lower and upper resource limits will be considered in 
dynamic resource resize feature for future plan.
-        *
-        * @param minResource The minimum resource for the iteration.
-        * @param preferredResource The preferred resource for the iteration.
-        * @return The iteration with set minimum and preferred resources.
-        */
-       /*
-       public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, 
ResourceSpec preferredResource) {
-               Preconditions.checkNotNull(minResource != null && 
preferredResource != null,
-                               "The min and preferred resources must be not 
null.");
-               Preconditions.checkArgument(minResource.isValid() && 
preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-                               "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
-
-               return this;
-       }*/
+//     
---------------------------------------------------------------------------
+//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
+//      The setters are hence commented out at this point.
+//     
---------------------------------------------------------------------------
+//
+//     /**
+//      * Sets the minimum and preferred resources for the iteration. This 
overrides the default empty resource.
+//      * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
+//      *
+//      * @param minResources The minimum resource for the iteration.
+//      * @param preferredResources The preferred resource for the iteration.
+//      * @return The iteration with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, 
ResourceSpec preferredResources) {
+//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+//                             "The values in resources must be not less than 
0 and the preferred resources must be greater than the min resources.");
+//
+//             this.minResources = minResources;
+//             this.preferredResources = preferredResources;
+//
+//             return this;
+//     }
+//
+//     /**
+//      * Sets the resource for the iteration, and the minimum and preferred 
resources are the same by default.
+//      *      The lower and upper resource limits will be considered in 
dynamic resource resize feature for future plan.
+//      *
+//      * @param resources The resource for the iteration.
+//      * @return The iteration with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DeltaIteration<ST, WT> setResource(ResourceSpec resources) {
+//             Preconditions.checkNotNull(resources, "The resources must be 
not null.");
+//             Preconditions.checkArgument(resources.isValid(), "The values in 
resource must be not less than 0.");
+//
+//             this.minResources = resources;
+//             this.preferredResources = resources;
+//
+//             return this;
+//     }
 
        /**
-        * Sets the resource for the iteration, and the minimum and preferred 
resources are the same by default.
-        *      The lower and upper resource limits will be considered in 
dynamic resource resize feature for future plan.
-        *
-        * @param resource The resource for the iteration.
-        * @return The iteration with set minimum and preferred resources.
-        */
-       /*
-       public DeltaIteration<ST, WT> setResource(ResourceSpec resource) {
-               Preconditions.checkNotNull(resource != null, "The resource must 
be not null.");
-               Preconditions.checkArgument(resource.isValid(), "The values in 
resource must be not less than 0.");
-
-               this.minResource = resource;
-               this.preferredResource = resource;
-
-               return this;
-       }*/
-
-       /**
-        * Gets the minimum resource from this iteration. If no minimum 
resource has been set,
+        * Gets the minimum resources from this iteration. If no minimum 
resources have been set,
         * it returns the default empty resource.
         *
-        * @return The minimum resource of the iteration.
+        * @return The minimum resources of the iteration.
         */
-       public ResourceSpec getMinResource() {
-               return this.minResource;
+       @PublicEvolving
+       public ResourceSpec getMinResources() {
+               return this.minResources;
        }
 
        /**
-        * Gets the preferred resource from this iteration. If no preferred 
resource has been set,
+        * Gets the preferred resources from this iteration. If no preferred 
resources have been set,
         * it returns the default empty resource.
         *
-        * @return The preferred resource of the iteration.
+        * @return The preferred resources of the iteration.
         */
-       public ResourceSpec getPreferredResource() {
-               return this.preferredResource;
+       @PublicEvolving
+       public ResourceSpec getPreferredResources() {
+               return this.preferredResources;
        }
-       
+
        /**
         * Registers an {@link Aggregator} for the iteration. Aggregators can 
be used to maintain simple statistics during the
         * iteration, such as number of elements processed. The aggregators 
compute global aggregates: After each iteration step,

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 79cae14..6ae59dd 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -39,9 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, 
O>> extends DataSet<
        
        protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 
-       protected ResourceSpec minResource = ResourceSpec.UNKNOWN;
+       protected ResourceSpec minResources = ResourceSpec.UNKNOWN;
+
+       protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
-       protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
 
        protected Operator(ExecutionEnvironment context, TypeInformation<OUT> 
resultType) {
                super(context, resultType);
@@ -81,8 +82,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, 
O>> extends DataSet<
         *
         * @return The minimum resource of this operator.
         */
-       public ResourceSpec minResource() {
-               return this.minResource;
+       public ResourceSpec getMinResources() {
+               return this.minResources;
        }
 
        /**
@@ -91,8 +92,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, 
O>> extends DataSet<
         *
         * @return The preferred resource of this operator.
         */
-       public ResourceSpec preferredResource() {
-               return this.preferredResource;
+       public ResourceSpec getPreferredResources() {
+               return this.preferredResources;
        }
 
        /**
@@ -129,45 +130,51 @@ public abstract class Operator<OUT, O extends 
Operator<OUT, O>> extends DataSet<
                return returnType;
        }
 
-       /**
-        * Sets the minimum and preferred resources for this operator. This 
overrides the default empty resource.
-        * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
-        *
-        * @param minResource The minimum resource for this operator.
-        * @param preferredResource The preferred resource for this operator.
-        * @return The operator with set minimum and preferred resources.
-        */
-       /*
-       public O setResource(ResourceSpec minResource, ResourceSpec 
preferredResource) {
-               Preconditions.checkNotNull(minResource != null && 
preferredResource != null,
-                               "The min and preferred resources must be not 
null.");
-               Preconditions.checkArgument(minResource.isValid() && 
preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-                               "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
-
-               @SuppressWarnings("unchecked")
-               O returnType = (O) this;
-               return returnType;
-       }*/
-
-       /**
-        * Sets the resource for this operator. This overrides the default 
empty minimum and preferred resources.
-        *
-        * @param resource The resource for this operator.
-        * @return The operator with set minimum and preferred resources.
-        */
-       /*
-       public O setResource(ResourceSpec resource) {
-               Preconditions.checkNotNull(resource != null, "The resource must 
be not null.");
-               Preconditions.checkArgument(resource.isValid(), "The resource 
values must be greater than 0.");
-
-               this.minResource = resource;
-               this.preferredResource = resource;
-
-               @SuppressWarnings("unchecked")
-               O returnType = (O) this;
-               return returnType;
-       }*/
+//     
---------------------------------------------------------------------------
+//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
+//      The setters are hence commented out at this point.
+//     
---------------------------------------------------------------------------
+//
+//     /**
+//      * Sets the minimum and preferred resources for this operator. This 
overrides the default empty resource.
+//      * The lower and upper resource limits will be considered in dynamic 
resource resize feature for future plan.
+//      *
+//      * @param minResources The minimum resource for this operator.
+//      * @param preferredResources The preferred resource for this operator.
+//      * @return The operator with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public O setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+//
+//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
+//
+//             this.minResources = minResources;
+//             this.preferredResources = preferredResources;
+//
+//             @SuppressWarnings("unchecked")
+//             O returnType = (O) this;
+//             return returnType;
+//     }
+//
+//     /**
+//      * Sets the resources for this operator. This overrides the default 
minimum and preferred resources.
+//      *
+//      * @param resources The resource for this operator.
+//      * @return The operator with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public O setResources(ResourceSpec resources) {
+//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
+//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
+//
+//             this.minResources = resources;
+//             this.preferredResources = resources;
+//
+//             @SuppressWarnings("unchecked")
+//             O returnType = (O) this;
+//             return returnType;
+//     }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 909cd32..3bffd8b 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -64,7 +64,7 @@ public class OperatorTranslation {
                // translate the sink itself and connect it to the input
                GenericDataSinkBase<T> translatedSink = 
sink.translateToDataFlow(input);
 
-               translatedSink.setResource(sink.getMinResource(), 
sink.getPreferredResource());
+               translatedSink.setResource(sink.getMinResources(), 
sink.getPreferredResources());
 
                return translatedSink;
        }
@@ -95,29 +95,29 @@ public class OperatorTranslation {
                if (dataSet instanceof DataSource) {
                        DataSource<T> dataSource = (DataSource<T>) dataSet;
                        dataFlowOp = dataSource.translateToDataFlow();
-                       dataFlowOp.setResource(dataSource.minResource(), 
dataSource.preferredResource());
+                       dataFlowOp.setResource(dataSource.getMinResources(), 
dataSource.getPreferredResources());
                }
                else if (dataSet instanceof SingleInputOperator) {
                        SingleInputOperator<?, ?, ?> singleInputOperator = 
(SingleInputOperator<?, ?, ?>) dataSet;
                        dataFlowOp = 
translateSingleInputOperator(singleInputOperator);
-                       dataFlowOp.setResource(singleInputOperator.minResource, 
singleInputOperator.preferredResource());
+                       
dataFlowOp.setResource(singleInputOperator.getMinResources(), 
singleInputOperator.getPreferredResources());
                }
                else if (dataSet instanceof TwoInputOperator) {
                        TwoInputOperator<?, ?, ?, ?> twoInputOperator = 
(TwoInputOperator<?, ?, ?, ?>) dataSet;
                        dataFlowOp = 
translateTwoInputOperator(twoInputOperator);
-                       dataFlowOp.setResource(twoInputOperator.minResource(), 
twoInputOperator.preferredResource());
+                       
dataFlowOp.setResource(twoInputOperator.getMinResources(), 
twoInputOperator.getPreferredResources());
                }
                else if (dataSet instanceof BulkIterationResultSet) {
-                       BulkIterationResultSet bulkIterationResultSet = 
(BulkIterationResultSet<?>) dataSet;
+                       BulkIterationResultSet<?> bulkIterationResultSet = 
(BulkIterationResultSet<?>) dataSet;
                        dataFlowOp = 
translateBulkIteration(bulkIterationResultSet);
-                       
dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(),
-                                       
bulkIterationResultSet.getIterationHead().preferredResource());
+                       
dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(),
+                                       
bulkIterationResultSet.getIterationHead().getPreferredResources());
                }
                else if (dataSet instanceof DeltaIterationResultSet) {
-                       DeltaIterationResultSet deltaIterationResultSet = 
(DeltaIterationResultSet<?, ?>) dataSet;
+                       DeltaIterationResultSet<?, ?> deltaIterationResultSet = 
(DeltaIterationResultSet<?, ?>) dataSet;
                        dataFlowOp = 
translateDeltaIteration(deltaIterationResultSet);
-                       
dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(),
-                                       
deltaIterationResultSet.getIterationHead().getPreferredResource());
+                       
dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(),
+                                       
deltaIterationResultSet.getIterationHead().getPreferredResources());
                }
                else if (dataSet instanceof 
DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof 
DeltaIteration.WorksetPlaceHolder) {
                        throw new InvalidProgramException("A data set that is 
part of a delta iteration was used as a sink or action."

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
index 4ef91b3..723c532 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java
@@ -310,12 +310,12 @@ public abstract class PlanNode implements 
Visitable<PlanNode>, DumpableNode<Plan
                return this.parallelism;
        }
 
-       public ResourceSpec getMinResource() {
-               return this.template.getOperator().getMinResource();
+       public ResourceSpec getMinResources() {
+               return this.template.getOperator().getMinResources();
        }
 
-       public ResourceSpec getPreferredResource() {
-               return this.template.getOperator().getPreferredResource();
+       public ResourceSpec getPreferredResources() {
+               return this.template.getOperator().getPreferredResources();
        }
        
        public long getGuaranteedAvailableMemory() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index 5cfb601..bfe7567 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -177,48 +177,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
         "parallelism.")
   }
 
-  /**
-   * Sets the minimum and preferred resources of this operation.
-   */
-  /*
-  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : 
Unit = {
-    javaSet match {
-      case ds: DataSource[_] => ds.setResource(minResource, preferredResource)
-      case op: Operator[_, _] => op.setResource(minResource, preferredResource)
-      case di: DeltaIterationResultSet[_, _] =>
-        di.getIterationHead.setResource(minResource, preferredResource)
-      case _ =>
-        throw new UnsupportedOperationException("Operator does not support " +
-          "configuring custom resources specs.")
-    }
-    this
-  }*/
 
-  /**
-   * Sets the resource of this operation.
-   */
-  /*
-  def resource(resource: ResourceSpec) : Unit = {
-    this.resource(resource, resource)
-  }*/
+// ---------------------------------------------------------------------------
+//  Fine-grained resource profiles are an incomplete work-in-progress feature
+//  The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//  /**
+//   * Sets the minimum and preferred resources of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(minResources: ResourceSpec, preferredResources: 
ResourceSpec) : Unit = {
+//    javaSet match {
+//      case ds: DataSource[_] => ds.setResources(minResources, 
preferredResources)
+//      case op: Operator[_, _] => op.setResources(minResources, 
preferredResources)
+//      case di: DeltaIterationResultSet[_, _] =>
+//        di.getIterationHead.setResources(minResources, preferredResources)
+//      case _ =>
+//        throw new UnsupportedOperationException("Operator does not support " 
+
+//          "configuring custom resources specs.")
+//    }
+//    this
+//  }
+//
+//  /**
+//   * Sets the resource of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(resources: ResourceSpec) : Unit = {
+//    this.resources(resources, resources)
+//  }
 
   /**
-   * Returns the minimum resource of this operation.
+   * Returns the minimum resources of this operation.
    */
-  def minResource: ResourceSpec = javaSet match {
-    case ds: DataSource[_] => ds.minResource()
-    case op: Operator[_, _] => op.minResource
+  @PublicEvolving
+  def minResources: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.getMinResources()
+    case op: Operator[_, _] => op.getMinResources()
     case _ =>
       throw new UnsupportedOperationException("Operator does not support " +
         "configuring custom resources specs.")
   }
 
   /**
-   * Returns the preferred resource of this operation.
+   * Returns the preferred resources of this operation.
    */
-  def preferredResource: ResourceSpec = javaSet match {
-    case ds: DataSource[_] => ds.preferredResource()
-    case op: Operator[_, _] => op.preferredResource
+  @PublicEvolving
+  def preferredResources: ResourceSpec = javaSet match {
+    case ds: DataSource[_] => ds.getPreferredResources()
+    case op: Operator[_, _] => op.getPreferredResources()
     case _ =>
       throw new UnsupportedOperationException("Operator does not support " +
         "configuring custom resources specs.")

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index ae1c39a..c443758 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -144,21 +144,23 @@ public class DataStream<T> {
        }
 
        /**
-        * Gets the minimum resource for this operator.
+        * Gets the minimum resources for this operator.
         *
-        * @return The minimum resource set for this operator.
+        * @return The minimum resources set for this operator.
         */
-       public ResourceSpec minResource() {
-               return transformation.getMinResource();
+       @PublicEvolving
+       public ResourceSpec getMinResources() {
+               return transformation.getMinResources();
        }
 
        /**
-        * Gets the preferred resource for this operator.
+        * Gets the preferred resources for this operator.
         *
-        * @return The preferred resource set for this operator.
+        * @return The preferred resources set for this operator.
         */
-       public ResourceSpec preferredResource() {
-               return transformation.getPreferredResource();
+       @PublicEvolving
+       public ResourceSpec getPreferredResources() {
+               return transformation.getPreferredResources();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
index 69e21d6..39d81c6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.transformations.SinkTransformation;
 @Public
 public class DataStreamSink<T> {
 
-       SinkTransformation<T> transformation;
+       private final SinkTransformation<T> transformation;
 
        @SuppressWarnings("unchecked")
        protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> 
operator) {
@@ -113,41 +113,45 @@ public class DataStreamSink<T> {
                return this;
        }
 
-       /**
-        * Sets the minimum and preferred resources for this sink, and the 
lower and upper resource limits will
-        * be considered in resource resize feature for future plan.
-        *
-        * @param minResource The minimum resource for this sink.
-        * @param preferredResource The preferred resource for this sink
-        * @return The sink with set minimum and preferred resources.
-        */
-       /*
-       public DataStreamSink<T> setResource(ResourceSpec minResource, 
ResourceSpec preferredResource) {
-               Preconditions.checkNotNull(minResource != null && 
preferredResource != null,
-                               "The min and preferred resources must be not 
null.");
-               Preconditions.checkArgument(minResource.isValid() && 
preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-                               "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-
-               transformation.setResource(minResource, preferredResource);
-
-               return this;
-       }*/
-
-       /**
-        * Sets the resource for this sink, the minimum and preferred resources 
are the same by default.
-        *
-        * @param resource The resource for this sink.
-        * @return The sink with set minimum and preferred resources.
-        */
-       /*
-       public DataStreamSink<T> setResource(ResourceSpec resource) {
-               Preconditions.checkNotNull(resource != null, "The resource must 
be not null.");
-               Preconditions.checkArgument(resource.isValid(), "The resource 
values must be greater than 0.");
-
-               transformation.setResource(resource, resource);
-
-               return this;
-       }*/
+//     
---------------------------------------------------------------------------
+//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
+//      The setters are hence commented out at this point.
+//     
---------------------------------------------------------------------------
+//     /**
+//      * Sets the minimum and preferred resources for this sink, and the 
lower and upper resource limits will
+//      * be considered in resource resize feature for future plan.
+//      *
+//      * @param minResources The minimum resources for this sink.
+//      * @param preferredResources The preferred resources for this sink
+//      * @return The sink with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DataStreamSink<T> setResources(ResourceSpec minResources, 
ResourceSpec preferredResources) {
+//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && 
minResources.lessThanOrEqual(preferredResources),
+//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
+//
+//             transformation.setResources(minResources, preferredResources);
+//
+//             return this;
+//     }
+//
+//     /**
+//      * Sets the resource for this sink, the minimum and preferred resources 
are the same by default.
+//      *
+//      * @param resources The resource for this sink.
+//      * @return The sink with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public DataStreamSink<T> setResources(ResourceSpec resources) {
+//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
+//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
+//
+//             transformation.setResources(resources, resources);
+//
+//             return this;
+//     }
 
        /**
         * Turns off chaining for this operator so thread co-location will not 
be

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index d856603..859c6d5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -154,41 +154,45 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
                return this;
        }
 
-       /**
-        * Sets the minimum and preferred resources for this operator, and the 
lower and upper resource limits will
-        * be considered in dynamic resource resize feature for future plan.
-        *
-        * @param minResource The minimum resource for this operator.
-        * @param preferredResource The preferred resource for this operator.
-        * @return The operator with set minimum and preferred resources.
-        */
-       /*
-       public SingleOutputStreamOperator<T> setResource(ResourceSpec 
minResource, ResourceSpec preferredResource) {
-               Preconditions.checkArgument(minResource != null && 
preferredResource != null,
-                               "The min and preferred resources must be not 
null.");
-               Preconditions.checkArgument(minResource.isValid() && 
preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource),
-                               "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
-
-               transformation.setResource(minResource, preferredResource);
-
-               return this;
-       }*/
-
-       /**
-        * Sets the resource for this operator, the minimum and preferred 
resources are the same by default.
-        *
-        * @param resource The resource for this operator.
-        * @return The operator with set minimum and preferred resources.
-        */
-       /*
-       public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) 
{
-               Preconditions.checkNotNull(resource != null, "The resource must 
be not null.");
-               Preconditions.checkArgument(resource.isValid(), "The resource 
values must be greater than 0.");
-
-               transformation.setResource(resource, resource);
-
-               return this;
-       }*/
+//     
---------------------------------------------------------------------------
+//      Fine-grained resource profiles are an incomplete work-in-progress 
feature
+//      The setters are hence commented out at this point.
+//     
---------------------------------------------------------------------------
+//     /**
+//      * Sets the minimum and preferred resources for this operator, and the 
lower and upper resource limits will
+//      * be considered in dynamic resource resize feature for future plan.
+//      *
+//      * @param minResources The minimum resources for this operator.
+//      * @param preferredResources The preferred resources for this operator.
+//      * @return The operator with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public SingleOutputStreamOperator<T> setResources(ResourceSpec 
minResources, ResourceSpec preferredResources) {
+//             Preconditions.checkNotNull(minResources, "The min resources 
must be not null.");
+//             Preconditions.checkNotNull(preferredResources, "The preferred 
resources must be not null.");
+//             Preconditions.checkArgument(minResources.isValid() && 
preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources),
+//                             "The values in resource must be not less than 0 
and the preferred resource must be greater than the min resource.");
+//
+//             transformation.setResources(minResources, preferredResources);
+//
+//             return this;
+//     }
+//
+//     /**
+//      * Sets the resources for this operator, the minimum and preferred 
resources are the same by default.
+//      *
+//      * @param resources The resources for this operator.
+//      * @return The operator with set minimum and preferred resources.
+//      */
+//     @PublicEvolving
+//     public SingleOutputStreamOperator<T> setResources(ResourceSpec 
resources) {
+//             Preconditions.checkNotNull(resources, "The resource must be not 
null.");
+//             Preconditions.checkArgument(resources.isValid(), "The resource 
values must be greater than 0.");
+//
+//             transformation.setResources(resources, resources);
+//
+//             return this;
+//     }
 
        private boolean canBeParallel() {
                return !nonParallel;

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index fcbc607..a87e63d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -416,7 +416,7 @@ public class StreamGraph extends StreamingPlan {
 
        public void setResource(int vertexID, ResourceSpec minResource, 
ResourceSpec preferredResource) {
                if (getStreamNode(vertexID) != null) {
-                       getStreamNode(vertexID).setResource(minResource, 
preferredResource);
+                       getStreamNode(vertexID).setResources(minResource, 
preferredResource);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index af92421..f55ff47 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -202,8 +202,8 @@ public class StreamGraphGenerator {
                        
streamGraph.setTransformationUserHash(transform.getId(), 
transform.getUserProvidedNodeHash());
                }
 
-               if (transform.getMinResource() != null && 
transform.getPreferredResource() != null) {
-                       streamGraph.setResource(transform.getId(), 
transform.getMinResource(), transform.getPreferredResource());
+               if (transform.getMinResources() != null && 
transform.getPreferredResources() != null) {
+                       streamGraph.setResource(transform.getId(), 
transform.getMinResources(), transform.getPreferredResources());
                }
 
                return transformedIds;

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0bf9adf..2d2e1e75 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -49,8 +49,8 @@ public class StreamNode implements Serializable {
         * dynamic scaling and the number of key groups used for partitioned 
state.
         */
        private int maxParallelism;
-       private ResourceSpec minResource;
-       private ResourceSpec preferredResource;
+       private ResourceSpec minResources;
+       private ResourceSpec preferredResources;
        private Long bufferTimeout = null;
        private final String operatorName;
        private String slotSharingGroup;
@@ -168,17 +168,17 @@ public class StreamNode implements Serializable {
                this.maxParallelism = maxParallelism;
        }
 
-       public ResourceSpec getMinResource() {
-               return minResource;
+       public ResourceSpec getMinResources() {
+               return minResources;
        }
 
-       public ResourceSpec getPreferredResource() {
-               return preferredResource;
+       public ResourceSpec getPreferredResources() {
+               return preferredResources;
        }
 
-       public void setResource(ResourceSpec minResource, ResourceSpec 
preferredResource) {
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
+       public void setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+               this.minResources = minResources;
+               this.preferredResources = preferredResources;
        }
 
        public Long getBufferTimeout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
index 1d22454..24b5736 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@code StreamTransformation} represents the operation that creates a
  * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
@@ -127,16 +129,16 @@ public abstract class StreamTransformation<T> {
        private int maxParallelism = -1;
 
        /**
-        *  The minimum resource for this stream transformation. It defines the 
lower limit for
-        *  dynamic resource resize in future plan.
+        *  The minimum resources for this stream transformation. It defines 
the lower limit for
+        *  dynamic resources resize in future plan.
         */
-       private ResourceSpec minResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec minResources = ResourceSpec.UNKNOWN;
 
        /**
-        *  The preferred resource for this stream transformation. It defines 
the upper limit for
+        *  The preferred resources for this stream transformation. It defines 
the upper limit for
         *  dynamic resource resize in future plan.
         */
-       private ResourceSpec preferredResource = ResourceSpec.UNKNOWN;
+       private ResourceSpec preferredResources = ResourceSpec.UNKNOWN;
 
        /**
         * User-specified ID for this transformation. This is used to assign the
@@ -229,12 +231,12 @@ public abstract class StreamTransformation<T> {
        /**
         * Sets the minimum and preferred resources for this stream 
transformation.
         *
-        * @param minResource The minimum resource of this transformation.
-        * @param preferredResource The preferred resource of this 
transformation.
+        * @param minResources The minimum resource of this transformation.
+        * @param preferredResources The preferred resource of this 
transformation.
         */
-       public void setResource(ResourceSpec minResource, ResourceSpec 
preferredResource) {
-               this.minResource = minResource;
-               this.preferredResource = preferredResource;
+       public void setResources(ResourceSpec minResources, ResourceSpec 
preferredResources) {
+               this.minResources = checkNotNull(minResources);
+               this.preferredResources = checkNotNull(preferredResources);
        }
 
        /**
@@ -242,8 +244,8 @@ public abstract class StreamTransformation<T> {
         *
         * @return The minimum resource of this transformation.
         */
-       public ResourceSpec getMinResource() {
-               return minResource;
+       public ResourceSpec getMinResources() {
+               return minResources;
        }
 
        /**
@@ -251,8 +253,8 @@ public abstract class StreamTransformation<T> {
         *
         * @return The preferred resource of this transformation.
         */
-       public ResourceSpec getPreferredResource() {
-               return preferredResource;
+       public ResourceSpec getPreferredResources() {
+               return preferredResources;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index e42fb3f..35e1f23 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -146,36 +146,42 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
-   * Returns the minimum resource of this operation.
+   * Returns the minimum resources of this operation.
    */
-  def minResource: ResourceSpec = stream.minResource()
-
-  /**
-   * Returns the preferred resource of this operation.
-   */
-  def preferredResource: ResourceSpec = stream.preferredResource()
-
-  /**
-   * Sets the minimum and preferred resources of this operation.
-   */
-  /*
-  def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : 
DataStream[T] =
-    stream match {
-      case stream : SingleOutputStreamOperator[T] => 
asScalaStream(stream.setResource(
-        minResource, preferredResource))
-      case _ =>
-        throw new UnsupportedOperationException("Operator does not support " +
-          "configuring custom resources specs.")
-      this
-  }*/
+  @PublicEvolving
+  def minResources: ResourceSpec = stream.getMinResources()
 
   /**
-   * Sets the resource of this operation.
+   * Returns the preferred resources of this operation.
    */
-  /*
-  def resource(resource: ResourceSpec) : Unit = {
-    this.resource(resource, resource)
-  }*/
+  @PublicEvolving
+  def preferredResources: ResourceSpec = stream.getPreferredResources()
+
+// ---------------------------------------------------------------------------
+//  Fine-grained resource profiles are an incomplete work-in-progress feature
+//  The setters are hence commented out at this point.
+// ---------------------------------------------------------------------------
+//  /**
+//   * Sets the minimum and preferred resources of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(minResources: ResourceSpec, preferredResources: 
ResourceSpec) : DataStream[T] =
+//    stream match {
+//      case stream : SingleOutputStreamOperator[T] => asScalaStream(
+//        stream.setResources(minResources, preferredResources))
+//      case _ =>
+//        throw new UnsupportedOperationException("Operator does not support " 
+
+//          "configuring custom resources specs.")
+//      this
+//  }
+//
+//  /**
+//   * Sets the resource of this operation.
+//   */
+//  @PublicEvolving
+//  def resources(resources: ResourceSpec) : Unit = {
+//    this.resources(resources, resources)
+//  }
 
   /**
    * Gets the name of the current data stream. This name is

http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
index 907ad9f..7abb392 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala
@@ -41,21 +41,6 @@ abstract class ScalaAPICompletenessTestBase extends 
TestLogger {
   protected def isExcludedByName(method: Method): Boolean
 
   /**
-   * Determines whether a method is excluded by an interface it uses.
-   */
-  protected def isExcludedByInterface(method: Method): Boolean = {
-    val excludedInterfaces =
-      Set("org.apache.spark.Logging", 
"org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
-    def toComparisionKey(method: Method) =
-      (method.getReturnType, method.getName, method.getGenericReturnType)
-    val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
-      excludedInterfaces.contains(i.getName)
-    }
-    val excludedMethods = 
interfaces.flatMap(_.getMethods.map(toComparisionKey))
-    excludedMethods.contains(toComparisionKey(method))
-  }
-
-  /**
    * Utility to be called during the test.
    */
   protected def checkMethods(
@@ -66,26 +51,33 @@ abstract class ScalaAPICompletenessTestBase extends 
TestLogger {
     val javaMethods = javaClass.getMethods
       .filterNot(_.isAccessible)
       .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
       .map(m => m.getName).toSet
 
     val scalaMethods = scalaClass.getMethods
       .filterNot(_.isAccessible)
       .filterNot(isExcludedByName)
-      .filterNot(isExcludedByInterface)
       .map(m => m.getName).toSet
 
     val missingMethods = javaMethods -- scalaMethods
 
-    for (method <- missingMethods) {
-      fail("Method " + method + " from " + javaClass + " is missing from " + 
scalaClassName + ".")
+    for (javaMethod <- missingMethods) {
+      // check if the method simply follows different getter / setter 
conventions in Scala / Java
+      // for example Java: getFoo() should match Scala: foo()
+      if (!containsScalaGetterLike(javaMethod, scalaMethods)) {
+        fail(s"Method $javaMethod from $javaClass is missing from 
$scalaClassName.")
+      }
     }
   }
 
-  protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : 
((AnyRef) => AnyRef)) {
-    val javaInstance = extractJavaFun(scalaInstance)
+  protected def containsScalaGetterLike(javaMethod: String, scalaMethods: 
Set[String]): Boolean = {
+    if (javaMethod.startsWith("get") && javaMethod.length >= 4) {
+      val scalaMethodName = Character.toLowerCase(javaMethod.charAt(3)) + 
javaMethod.substring(4)
+      scalaMethods.contains(scalaMethodName)
+    } else {
+      false
+    }
   }
-
+  
   /**
    * Tests to be performed to ensure API completeness.
    */

Reply via email to