[FLINK-5133] [core] Followups for ResourceSpec on DataSet / DataStream API - Correct some use of Preconditions.checkNotNull - Make 'resources' plural in all cases - Add comments to why the setters are commented out - Add @PublicEvolving annotations - Make the Scala API completeness test match Scala-esk versions of Java Getters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9912de21 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9912de21 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9912de21 Branch: refs/heads/master Commit: 9912de21a1053013a220707f8b3868bdbf93aaca Parents: f37ed02 Author: Stephan Ewen <[email protected]> Authored: Tue Feb 28 10:42:16 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Tue Feb 28 18:59:10 2017 +0100 ---------------------------------------------------------------------- .../flink/api/common/operators/Operator.java | 41 ++++--- .../api/common/operators/ResourceSpec.java | 29 ++--- .../flink/api/java/operators/DataSink.java | 109 ++++++++++--------- .../api/java/operators/DeltaIteration.java | 105 +++++++++--------- .../flink/api/java/operators/Operator.java | 101 +++++++++-------- .../api/java/operators/OperatorTranslation.java | 20 ++-- .../apache/flink/optimizer/plan/PlanNode.java | 8 +- .../org/apache/flink/api/scala/DataSet.scala | 69 ++++++------ .../streaming/api/datastream/DataStream.java | 18 +-- .../api/datastream/DataStreamSink.java | 76 +++++++------ .../datastream/SingleOutputStreamOperator.java | 74 +++++++------ .../flink/streaming/api/graph/StreamGraph.java | 2 +- .../api/graph/StreamGraphGenerator.java | 4 +- .../flink/streaming/api/graph/StreamNode.java | 18 +-- .../transformations/StreamTransformation.java | 30 ++--- .../flink/streaming/api/scala/DataStream.scala | 58 +++++----- .../ScalaAPICompletenessTestBase.scala | 36 +++--- 17 files changed, 424 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index a9dedfa..1905555 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -16,18 +16,20 @@ * limitations under the License. */ - package org.apache.flink.api.common.operators; import java.util.List; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Visitable; +import javax.annotation.Nullable; + /** * Abstract base class for all operators. An operator is a source, sink, or it applies an operation to * one or more inputs, producing a result. @@ -45,9 +47,11 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use - private ResourceSpec minResource; // the minimum resource of the contract instance. + @Nullable + private ResourceSpec minResources; // the minimum resource of the contract instance. - private ResourceSpec preferredResource; // the preferred resource of the contract instance. + @Nullable + private ResourceSpec preferredResources; // the preferred resource of the contract instance. /** * The return type of the user function. @@ -190,35 +194,40 @@ public abstract class Operator<OUT> implements Visitable<Operator<?>> { } /** - * Gets the minimum resource for this contract instance. The minimum resource denotes how many - * resources will be needed in the minimum for the user function during the execution. + * Gets the minimum resources for this operator. The minimum resources denotes how many + * resources will be needed at least minimum for the operator or user function during the execution. * - * @return The minimum resource of this operator. + * @return The minimum resources of this operator. */ - public ResourceSpec getMinResource() { - return this.minResource; + @Nullable + @PublicEvolving + public ResourceSpec getMinResources() { + return this.minResources; } /** - * Gets the preferred resource for this contract instance. The preferred resource denotes how many + * Gets the preferred resources for this contract instance. The preferred resources denote how many * resources will be needed in the maximum for the user function during the execution. * * @return The preferred resource of this operator. */ - public ResourceSpec getPreferredResource() { - return this.preferredResource; + @Nullable + @PublicEvolving + public ResourceSpec getPreferredResources() { + return this.preferredResources; } /** * Sets the minimum and preferred resources for this contract instance. The resource denotes * how many memories and cpu cores of the user function will be consumed during the execution. * - * @param minResource The minimum resource of this operator. - * @param preferredResource The preferred resource of this operator. + * @param minResources The minimum resource of this operator. + * @param preferredResources The preferred resource of this operator. */ - public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - this.minResource = minResource; - this.preferredResource = preferredResource; + @PublicEvolving + public void setResource(ResourceSpec minResources, ResourceSpec preferredResources) { + this.minResources = minResources; + this.preferredResources = preferredResources; } http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java index 1387508..0ea289a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java @@ -105,14 +105,12 @@ public class ResourceSpec implements Serializable { * @return The new resource with merged values. */ public ResourceSpec merge(ResourceSpec other) { - ResourceSpec result = new ResourceSpec( + return new ResourceSpec( Math.max(this.cpuCores, other.cpuCores), this.heapMemoryInMB + other.heapMemoryInMB, this.directMemoryInMB + other.directMemoryInMB, this.nativeMemoryInMB + other.nativeMemoryInMB, this.stateSizeInMB + other.stateSizeInMB); - - return result; } public double getCpuCores() { @@ -141,12 +139,8 @@ public class ResourceSpec implements Serializable { * @return True if all the values are equal or greater than 0, otherwise false. */ public boolean isValid() { - if (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 && - this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0) { - return true; - } else { - return false; - } + return (this.cpuCores >= 0 && this.heapMemoryInMB >= 0 && this.directMemoryInMB >= 0 && + this.nativeMemoryInMB >= 0 && this.stateSizeInMB >= 0); } /** @@ -162,11 +156,7 @@ public class ResourceSpec implements Serializable { int cmp3 = Integer.compare(this.directMemoryInMB, other.directMemoryInMB); int cmp4 = Integer.compare(this.nativeMemoryInMB, other.nativeMemoryInMB); int cmp5 = Integer.compare(this.stateSizeInMB, other.stateSizeInMB); - if (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0) { - return true; - } else { - return false; - } + return (cmp1 <= 0 && cmp2 <= 0 && cmp3 <= 0 && cmp4 <= 0 && cmp5 <= 0); } @Override @@ -186,6 +176,17 @@ public class ResourceSpec implements Serializable { } @Override + public int hashCode() { + final long cpuBits = Double.doubleToLongBits(cpuCores); + int result = (int) (cpuBits ^ (cpuBits >>> 32)); + result = 31 * result + heapMemoryInMB; + result = 31 * result + directMemoryInMB; + result = 31 * result + nativeMemoryInMB; + result = 31 * result + stateSizeInMB; + return result; + } + + @Override public String toString() { return "ResourceSpec{" + "cpuCores=" + cpuCores + http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java index 3be9cc0..369e013 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java @@ -52,9 +52,9 @@ public class DataSink<T> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - private ResourceSpec minResource = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.UNKNOWN; - private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; private Configuration parameters; @@ -285,61 +285,70 @@ public class DataSink<T> { } /** - * Returns the minimum resource of this data sink. If no minimum resource has been set, - * it returns the default empty resource. + * Returns the minimum resources of this data sink. If no minimum resources have been set, + * this returns the default resource profile. * - * @return The minimum resource of this data sink. + * @return The minimum resources of this data sink. */ - public ResourceSpec getMinResource() { - return this.minResource; + @PublicEvolving + public ResourceSpec getMinResources() { + return this.minResources; } /** - * Returns the preferred resource of this data sink. If no preferred resource has been set, - * it returns the default empty resource. + * Returns the preferred resources of this data sink. If no preferred resources have been set, + * this returns the default resource profile. * - * @return The preferred resource of this data sink. + * @return The preferred resources of this data sink. */ - public ResourceSpec getPreferredResource() { - return this.preferredResource; + @PublicEvolving + public ResourceSpec getPreferredResources() { + return this.preferredResources; } - /** - * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource. - * The minimum resource must be satisfied and the preferred resource specifies the upper bound - * for dynamic resource resize. - * - * @param minResource The minimum resource for this data sink. - * @param preferredResource The preferred resource for this data sink. - * @return The data sink with set minimum and preferred resources. - */ - /* - public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - Preconditions.checkNotNull(minResource != null && preferredResource != null, - "The min and preferred resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), - "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - - this.minResource = minResource; - this.preferredResource = preferredResource; - - return this; - }*/ - - /** - * Sets the resource for this data sink. This overrides the default empty minimum and preferred resources. - * - * @param resource The resource for this data sink. - * @return The data sink with set minimum and preferred resources. - */ - /* - public DataSink<T> setResource(ResourceSpec resource) { - Preconditions.checkNotNull(resource != null, "The resource must be not null."); - Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); - - this.minResource = resource; - this.preferredResource = resource; - - return this; - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// +// /** +// * Sets the minimum and preferred resources for this data sink. This overrides the default empty resource. +// * The minimum resource must be satisfied and the preferred resource specifies the upper bound +// * for dynamic resource resize. +// * +// * @param minResources The minimum resource for this data sink. +// * @param preferredResources The preferred resource for this data sink. +// * @return The data sink with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DataSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { +// Preconditions.checkNotNull(minResources, "The min resources must be not null."); +// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); +// Preconditions.checkArgument(minResources.isValid() && +// preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), +// "The values in resource must be not less than 0 and the preferred " + +// "resource must be greater than the min resource."); +// +// this.minResources = minResources; +// this.preferredResources = preferredResources; +// +// return this; +// } +// +// /** +// * Sets the resources for this data sink. This overrides the default resource profile. +// * +// * @param resources The resources for this data sink. +// * @return The data sink with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DataSink<T> setResources(ResourceSpec resources) { +// Preconditions.checkNotNull(resources, "The resources must be not null."); +// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); +// +// this.minResources = resources; +// this.preferredResources = resources; +// +// return this; +// } } http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java index cf0a63e..3d327e9 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DeltaIteration.java @@ -64,9 +64,9 @@ public class DeltaIteration<ST, WT> { private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - private ResourceSpec minResource = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.UNKNOWN; - private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; private boolean solutionSetUnManaged; @@ -197,65 +197,72 @@ public class DeltaIteration<ST, WT> { return parallelism; } - /** - * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource. - * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. - * - * @param minResource The minimum resource for the iteration. - * @param preferredResource The preferred resource for the iteration. - * @return The iteration with set minimum and preferred resources. - */ - /* - public DeltaIteration<ST, WT> setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - Preconditions.checkNotNull(minResource != null && preferredResource != null, - "The min and preferred resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), - "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - - this.minResource = minResource; - this.preferredResource = preferredResource; - - return this; - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// +// /** +// * Sets the minimum and preferred resources for the iteration. This overrides the default empty resource. +// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. +// * +// * @param minResources The minimum resource for the iteration. +// * @param preferredResources The preferred resource for the iteration. +// * @return The iteration with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DeltaIteration<ST, WT> setResource(ResourceSpec minResources, ResourceSpec preferredResources) { +// Preconditions.checkNotNull(minResources, "The min resources must be not null."); +// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); +// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), +// "The values in resources must be not less than 0 and the preferred resources must be greater than the min resources."); +// +// this.minResources = minResources; +// this.preferredResources = preferredResources; +// +// return this; +// } +// +// /** +// * Sets the resource for the iteration, and the minimum and preferred resources are the same by default. +// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. +// * +// * @param resources The resource for the iteration. +// * @return The iteration with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DeltaIteration<ST, WT> setResource(ResourceSpec resources) { +// Preconditions.checkNotNull(resources, "The resources must be not null."); +// Preconditions.checkArgument(resources.isValid(), "The values in resource must be not less than 0."); +// +// this.minResources = resources; +// this.preferredResources = resources; +// +// return this; +// } /** - * Sets the resource for the iteration, and the minimum and preferred resources are the same by default. - * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. - * - * @param resource The resource for the iteration. - * @return The iteration with set minimum and preferred resources. - */ - /* - public DeltaIteration<ST, WT> setResource(ResourceSpec resource) { - Preconditions.checkNotNull(resource != null, "The resource must be not null."); - Preconditions.checkArgument(resource.isValid(), "The values in resource must be not less than 0."); - - this.minResource = resource; - this.preferredResource = resource; - - return this; - }*/ - - /** - * Gets the minimum resource from this iteration. If no minimum resource has been set, + * Gets the minimum resources from this iteration. If no minimum resources have been set, * it returns the default empty resource. * - * @return The minimum resource of the iteration. + * @return The minimum resources of the iteration. */ - public ResourceSpec getMinResource() { - return this.minResource; + @PublicEvolving + public ResourceSpec getMinResources() { + return this.minResources; } /** - * Gets the preferred resource from this iteration. If no preferred resource has been set, + * Gets the preferred resources from this iteration. If no preferred resources have been set, * it returns the default empty resource. * - * @return The preferred resource of the iteration. + * @return The preferred resources of the iteration. */ - public ResourceSpec getPreferredResource() { - return this.preferredResource; + @PublicEvolving + public ResourceSpec getPreferredResources() { + return this.preferredResources; } - + /** * Registers an {@link Aggregator} for the iteration. Aggregators can be used to maintain simple statistics during the * iteration, such as number of elements processed. The aggregators compute global aggregates: After each iteration step, http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java index 79cae14..6ae59dd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java @@ -39,9 +39,10 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< protected int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; - protected ResourceSpec minResource = ResourceSpec.UNKNOWN; + protected ResourceSpec minResources = ResourceSpec.UNKNOWN; + + protected ResourceSpec preferredResources = ResourceSpec.UNKNOWN; - protected ResourceSpec preferredResource = ResourceSpec.UNKNOWN; protected Operator(ExecutionEnvironment context, TypeInformation<OUT> resultType) { super(context, resultType); @@ -81,8 +82,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< * * @return The minimum resource of this operator. */ - public ResourceSpec minResource() { - return this.minResource; + public ResourceSpec getMinResources() { + return this.minResources; } /** @@ -91,8 +92,8 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< * * @return The preferred resource of this operator. */ - public ResourceSpec preferredResource() { - return this.preferredResource; + public ResourceSpec getPreferredResources() { + return this.preferredResources; } /** @@ -129,45 +130,51 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet< return returnType; } - /** - * Sets the minimum and preferred resources for this operator. This overrides the default empty resource. - * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. - * - * @param minResource The minimum resource for this operator. - * @param preferredResource The preferred resource for this operator. - * @return The operator with set minimum and preferred resources. - */ - /* - public O setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - Preconditions.checkNotNull(minResource != null && preferredResource != null, - "The min and preferred resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), - "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - - this.minResource = minResource; - this.preferredResource = preferredResource; - - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - }*/ - - /** - * Sets the resource for this operator. This overrides the default empty minimum and preferred resources. - * - * @param resource The resource for this operator. - * @return The operator with set minimum and preferred resources. - */ - /* - public O setResource(ResourceSpec resource) { - Preconditions.checkNotNull(resource != null, "The resource must be not null."); - Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); - - this.minResource = resource; - this.preferredResource = resource; - - @SuppressWarnings("unchecked") - O returnType = (O) this; - return returnType; - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// +// /** +// * Sets the minimum and preferred resources for this operator. This overrides the default empty resource. +// * The lower and upper resource limits will be considered in dynamic resource resize feature for future plan. +// * +// * @param minResources The minimum resource for this operator. +// * @param preferredResources The preferred resource for this operator. +// * @return The operator with set minimum and preferred resources. +// */ +// @PublicEvolving +// public O setResources(ResourceSpec minResources, ResourceSpec preferredResources) { +// Preconditions.checkNotNull(minResources, "The min resources must be not null."); +// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); +// +// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), +// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); +// +// this.minResources = minResources; +// this.preferredResources = preferredResources; +// +// @SuppressWarnings("unchecked") +// O returnType = (O) this; +// return returnType; +// } +// +// /** +// * Sets the resources for this operator. This overrides the default minimum and preferred resources. +// * +// * @param resources The resource for this operator. +// * @return The operator with set minimum and preferred resources. +// */ +// @PublicEvolving +// public O setResources(ResourceSpec resources) { +// Preconditions.checkNotNull(resources, "The resource must be not null."); +// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); +// +// this.minResources = resources; +// this.preferredResources = resources; +// +// @SuppressWarnings("unchecked") +// O returnType = (O) this; +// return returnType; +// } } http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java index 909cd32..3bffd8b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java @@ -64,7 +64,7 @@ public class OperatorTranslation { // translate the sink itself and connect it to the input GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input); - translatedSink.setResource(sink.getMinResource(), sink.getPreferredResource()); + translatedSink.setResource(sink.getMinResources(), sink.getPreferredResources()); return translatedSink; } @@ -95,29 +95,29 @@ public class OperatorTranslation { if (dataSet instanceof DataSource) { DataSource<T> dataSource = (DataSource<T>) dataSet; dataFlowOp = dataSource.translateToDataFlow(); - dataFlowOp.setResource(dataSource.minResource(), dataSource.preferredResource()); + dataFlowOp.setResource(dataSource.getMinResources(), dataSource.getPreferredResources()); } else if (dataSet instanceof SingleInputOperator) { SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet; dataFlowOp = translateSingleInputOperator(singleInputOperator); - dataFlowOp.setResource(singleInputOperator.minResource, singleInputOperator.preferredResource()); + dataFlowOp.setResource(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources()); } else if (dataSet instanceof TwoInputOperator) { TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet; dataFlowOp = translateTwoInputOperator(twoInputOperator); - dataFlowOp.setResource(twoInputOperator.minResource(), twoInputOperator.preferredResource()); + dataFlowOp.setResource(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources()); } else if (dataSet instanceof BulkIterationResultSet) { - BulkIterationResultSet bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet; + BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet; dataFlowOp = translateBulkIteration(bulkIterationResultSet); - dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().minResource(), - bulkIterationResultSet.getIterationHead().preferredResource()); + dataFlowOp.setResource(bulkIterationResultSet.getIterationHead().getMinResources(), + bulkIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIterationResultSet) { - DeltaIterationResultSet deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet; + DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet; dataFlowOp = translateDeltaIteration(deltaIterationResultSet); - dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResource(), - deltaIterationResultSet.getIterationHead().getPreferredResource()); + dataFlowOp.setResource(deltaIterationResultSet.getIterationHead().getMinResources(), + deltaIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action." http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java index 4ef91b3..723c532 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/PlanNode.java @@ -310,12 +310,12 @@ public abstract class PlanNode implements Visitable<PlanNode>, DumpableNode<Plan return this.parallelism; } - public ResourceSpec getMinResource() { - return this.template.getOperator().getMinResource(); + public ResourceSpec getMinResources() { + return this.template.getOperator().getMinResources(); } - public ResourceSpec getPreferredResource() { - return this.template.getOperator().getPreferredResource(); + public ResourceSpec getPreferredResources() { + return this.template.getOperator().getPreferredResources(); } public long getGuaranteedAvailableMemory() { http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 5cfb601..bfe7567 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -177,48 +177,55 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { "parallelism.") } - /** - * Sets the minimum and preferred resources of this operation. - */ - /* - def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : Unit = { - javaSet match { - case ds: DataSource[_] => ds.setResource(minResource, preferredResource) - case op: Operator[_, _] => op.setResource(minResource, preferredResource) - case di: DeltaIterationResultSet[_, _] => - di.getIterationHead.setResource(minResource, preferredResource) - case _ => - throw new UnsupportedOperationException("Operator does not support " + - "configuring custom resources specs.") - } - this - }*/ - /** - * Sets the resource of this operation. - */ - /* - def resource(resource: ResourceSpec) : Unit = { - this.resource(resource, resource) - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// /** +// * Sets the minimum and preferred resources of this operation. +// */ +// @PublicEvolving +// def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : Unit = { +// javaSet match { +// case ds: DataSource[_] => ds.setResources(minResources, preferredResources) +// case op: Operator[_, _] => op.setResources(minResources, preferredResources) +// case di: DeltaIterationResultSet[_, _] => +// di.getIterationHead.setResources(minResources, preferredResources) +// case _ => +// throw new UnsupportedOperationException("Operator does not support " + +// "configuring custom resources specs.") +// } +// this +// } +// +// /** +// * Sets the resource of this operation. +// */ +// @PublicEvolving +// def resources(resources: ResourceSpec) : Unit = { +// this.resources(resources, resources) +// } /** - * Returns the minimum resource of this operation. + * Returns the minimum resources of this operation. */ - def minResource: ResourceSpec = javaSet match { - case ds: DataSource[_] => ds.minResource() - case op: Operator[_, _] => op.minResource + @PublicEvolving + def minResources: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.getMinResources() + case op: Operator[_, _] => op.getMinResources() case _ => throw new UnsupportedOperationException("Operator does not support " + "configuring custom resources specs.") } /** - * Returns the preferred resource of this operation. + * Returns the preferred resources of this operation. */ - def preferredResource: ResourceSpec = javaSet match { - case ds: DataSource[_] => ds.preferredResource() - case op: Operator[_, _] => op.preferredResource + @PublicEvolving + def preferredResources: ResourceSpec = javaSet match { + case ds: DataSource[_] => ds.getPreferredResources() + case op: Operator[_, _] => op.getPreferredResources() case _ => throw new UnsupportedOperationException("Operator does not support " + "configuring custom resources specs.") http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index ae1c39a..c443758 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -144,21 +144,23 @@ public class DataStream<T> { } /** - * Gets the minimum resource for this operator. + * Gets the minimum resources for this operator. * - * @return The minimum resource set for this operator. + * @return The minimum resources set for this operator. */ - public ResourceSpec minResource() { - return transformation.getMinResource(); + @PublicEvolving + public ResourceSpec getMinResources() { + return transformation.getMinResources(); } /** - * Gets the preferred resource for this operator. + * Gets the preferred resources for this operator. * - * @return The preferred resource set for this operator. + * @return The preferred resources set for this operator. */ - public ResourceSpec preferredResource() { - return transformation.getPreferredResource(); + @PublicEvolving + public ResourceSpec getPreferredResources() { + return transformation.getPreferredResources(); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index 69e21d6..39d81c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.transformations.SinkTransformation; @Public public class DataStreamSink<T> { - SinkTransformation<T> transformation; + private final SinkTransformation<T> transformation; @SuppressWarnings("unchecked") protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) { @@ -113,41 +113,45 @@ public class DataStreamSink<T> { return this; } - /** - * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will - * be considered in resource resize feature for future plan. - * - * @param minResource The minimum resource for this sink. - * @param preferredResource The preferred resource for this sink - * @return The sink with set minimum and preferred resources. - */ - /* - public DataStreamSink<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - Preconditions.checkNotNull(minResource != null && preferredResource != null, - "The min and preferred resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), - "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - - transformation.setResource(minResource, preferredResource); - - return this; - }*/ - - /** - * Sets the resource for this sink, the minimum and preferred resources are the same by default. - * - * @param resource The resource for this sink. - * @return The sink with set minimum and preferred resources. - */ - /* - public DataStreamSink<T> setResource(ResourceSpec resource) { - Preconditions.checkNotNull(resource != null, "The resource must be not null."); - Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); - - transformation.setResource(resource, resource); - - return this; - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// /** +// * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will +// * be considered in resource resize feature for future plan. +// * +// * @param minResources The minimum resources for this sink. +// * @param preferredResources The preferred resources for this sink +// * @return The sink with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { +// Preconditions.checkNotNull(minResources, "The min resources must be not null."); +// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); +// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResources.lessThanOrEqual(preferredResources), +// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); +// +// transformation.setResources(minResources, preferredResources); +// +// return this; +// } +// +// /** +// * Sets the resource for this sink, the minimum and preferred resources are the same by default. +// * +// * @param resources The resource for this sink. +// * @return The sink with set minimum and preferred resources. +// */ +// @PublicEvolving +// public DataStreamSink<T> setResources(ResourceSpec resources) { +// Preconditions.checkNotNull(resources, "The resource must be not null."); +// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); +// +// transformation.setResources(resources, resources); +// +// return this; +// } /** * Turns off chaining for this operator so thread co-location will not be http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index d856603..859c6d5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -154,41 +154,45 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { return this; } - /** - * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will - * be considered in dynamic resource resize feature for future plan. - * - * @param minResource The minimum resource for this operator. - * @param preferredResource The preferred resource for this operator. - * @return The operator with set minimum and preferred resources. - */ - /* - public SingleOutputStreamOperator<T> setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - Preconditions.checkArgument(minResource != null && preferredResource != null, - "The min and preferred resources must be not null."); - Preconditions.checkArgument(minResource.isValid() && preferredResource.isValid() && minResource.lessThanOrEqual(preferredResource), - "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); - - transformation.setResource(minResource, preferredResource); - - return this; - }*/ - - /** - * Sets the resource for this operator, the minimum and preferred resources are the same by default. - * - * @param resource The resource for this operator. - * @return The operator with set minimum and preferred resources. - */ - /* - public SingleOutputStreamOperator<T> setResource(ResourceSpec resource) { - Preconditions.checkNotNull(resource != null, "The resource must be not null."); - Preconditions.checkArgument(resource.isValid(), "The resource values must be greater than 0."); - - transformation.setResource(resource, resource); - - return this; - }*/ +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// /** +// * Sets the minimum and preferred resources for this operator, and the lower and upper resource limits will +// * be considered in dynamic resource resize feature for future plan. +// * +// * @param minResources The minimum resources for this operator. +// * @param preferredResources The preferred resources for this operator. +// * @return The operator with set minimum and preferred resources. +// */ +// @PublicEvolving +// public SingleOutputStreamOperator<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) { +// Preconditions.checkNotNull(minResources, "The min resources must be not null."); +// Preconditions.checkNotNull(preferredResources, "The preferred resources must be not null."); +// Preconditions.checkArgument(minResources.isValid() && preferredResources.isValid() && minResource.lessThanOrEqual(preferredResources), +// "The values in resource must be not less than 0 and the preferred resource must be greater than the min resource."); +// +// transformation.setResources(minResources, preferredResources); +// +// return this; +// } +// +// /** +// * Sets the resources for this operator, the minimum and preferred resources are the same by default. +// * +// * @param resources The resources for this operator. +// * @return The operator with set minimum and preferred resources. +// */ +// @PublicEvolving +// public SingleOutputStreamOperator<T> setResources(ResourceSpec resources) { +// Preconditions.checkNotNull(resources, "The resource must be not null."); +// Preconditions.checkArgument(resources.isValid(), "The resource values must be greater than 0."); +// +// transformation.setResources(resources, resources); +// +// return this; +// } private boolean canBeParallel() { return !nonParallel; http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index fcbc607..a87e63d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -416,7 +416,7 @@ public class StreamGraph extends StreamingPlan { public void setResource(int vertexID, ResourceSpec minResource, ResourceSpec preferredResource) { if (getStreamNode(vertexID) != null) { - getStreamNode(vertexID).setResource(minResource, preferredResource); + getStreamNode(vertexID).setResources(minResource, preferredResource); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index af92421..f55ff47 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -202,8 +202,8 @@ public class StreamGraphGenerator { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } - if (transform.getMinResource() != null && transform.getPreferredResource() != null) { - streamGraph.setResource(transform.getId(), transform.getMinResource(), transform.getPreferredResource()); + if (transform.getMinResources() != null && transform.getPreferredResources() != null) { + streamGraph.setResource(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } return transformedIds; http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 0bf9adf..2d2e1e75 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -49,8 +49,8 @@ public class StreamNode implements Serializable { * dynamic scaling and the number of key groups used for partitioned state. */ private int maxParallelism; - private ResourceSpec minResource; - private ResourceSpec preferredResource; + private ResourceSpec minResources; + private ResourceSpec preferredResources; private Long bufferTimeout = null; private final String operatorName; private String slotSharingGroup; @@ -168,17 +168,17 @@ public class StreamNode implements Serializable { this.maxParallelism = maxParallelism; } - public ResourceSpec getMinResource() { - return minResource; + public ResourceSpec getMinResources() { + return minResources; } - public ResourceSpec getPreferredResource() { - return preferredResource; + public ResourceSpec getPreferredResources() { + return preferredResources; } - public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - this.minResource = minResource; - this.preferredResource = preferredResource; + public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + this.minResources = minResources; + this.preferredResources = preferredResources; } public Long getBufferTimeout() { http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java index 1d22454..24b5736 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java @@ -30,6 +30,8 @@ import org.apache.flink.util.Preconditions; import java.util.Collection; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * A {@code StreamTransformation} represents the operation that creates a * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every @@ -127,16 +129,16 @@ public abstract class StreamTransformation<T> { private int maxParallelism = -1; /** - * The minimum resource for this stream transformation. It defines the lower limit for - * dynamic resource resize in future plan. + * The minimum resources for this stream transformation. It defines the lower limit for + * dynamic resources resize in future plan. */ - private ResourceSpec minResource = ResourceSpec.UNKNOWN; + private ResourceSpec minResources = ResourceSpec.UNKNOWN; /** - * The preferred resource for this stream transformation. It defines the upper limit for + * The preferred resources for this stream transformation. It defines the upper limit for * dynamic resource resize in future plan. */ - private ResourceSpec preferredResource = ResourceSpec.UNKNOWN; + private ResourceSpec preferredResources = ResourceSpec.UNKNOWN; /** * User-specified ID for this transformation. This is used to assign the @@ -229,12 +231,12 @@ public abstract class StreamTransformation<T> { /** * Sets the minimum and preferred resources for this stream transformation. * - * @param minResource The minimum resource of this transformation. - * @param preferredResource The preferred resource of this transformation. + * @param minResources The minimum resource of this transformation. + * @param preferredResources The preferred resource of this transformation. */ - public void setResource(ResourceSpec minResource, ResourceSpec preferredResource) { - this.minResource = minResource; - this.preferredResource = preferredResource; + public void setResources(ResourceSpec minResources, ResourceSpec preferredResources) { + this.minResources = checkNotNull(minResources); + this.preferredResources = checkNotNull(preferredResources); } /** @@ -242,8 +244,8 @@ public abstract class StreamTransformation<T> { * * @return The minimum resource of this transformation. */ - public ResourceSpec getMinResource() { - return minResource; + public ResourceSpec getMinResources() { + return minResources; } /** @@ -251,8 +253,8 @@ public abstract class StreamTransformation<T> { * * @return The preferred resource of this transformation. */ - public ResourceSpec getPreferredResource() { - return preferredResource; + public ResourceSpec getPreferredResources() { + return preferredResources; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index e42fb3f..35e1f23 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -146,36 +146,42 @@ class DataStream[T](stream: JavaStream[T]) { } /** - * Returns the minimum resource of this operation. + * Returns the minimum resources of this operation. */ - def minResource: ResourceSpec = stream.minResource() - - /** - * Returns the preferred resource of this operation. - */ - def preferredResource: ResourceSpec = stream.preferredResource() - - /** - * Sets the minimum and preferred resources of this operation. - */ - /* - def resource(minResource: ResourceSpec, preferredResource: ResourceSpec) : DataStream[T] = - stream match { - case stream : SingleOutputStreamOperator[T] => asScalaStream(stream.setResource( - minResource, preferredResource)) - case _ => - throw new UnsupportedOperationException("Operator does not support " + - "configuring custom resources specs.") - this - }*/ + @PublicEvolving + def minResources: ResourceSpec = stream.getMinResources() /** - * Sets the resource of this operation. + * Returns the preferred resources of this operation. */ - /* - def resource(resource: ResourceSpec) : Unit = { - this.resource(resource, resource) - }*/ + @PublicEvolving + def preferredResources: ResourceSpec = stream.getPreferredResources() + +// --------------------------------------------------------------------------- +// Fine-grained resource profiles are an incomplete work-in-progress feature +// The setters are hence commented out at this point. +// --------------------------------------------------------------------------- +// /** +// * Sets the minimum and preferred resources of this operation. +// */ +// @PublicEvolving +// def resources(minResources: ResourceSpec, preferredResources: ResourceSpec) : DataStream[T] = +// stream match { +// case stream : SingleOutputStreamOperator[T] => asScalaStream( +// stream.setResources(minResources, preferredResources)) +// case _ => +// throw new UnsupportedOperationException("Operator does not support " + +// "configuring custom resources specs.") +// this +// } +// +// /** +// * Sets the resource of this operation. +// */ +// @PublicEvolving +// def resources(resources: ResourceSpec) : Unit = { +// this.resources(resources, resources) +// } /** * Gets the name of the current data stream. This name is http://git-wip-us.apache.org/repos/asf/flink/blob/9912de21/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala index 907ad9f..7abb392 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/ScalaAPICompletenessTestBase.scala @@ -41,21 +41,6 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger { protected def isExcludedByName(method: Method): Boolean /** - * Determines whether a method is excluded by an interface it uses. - */ - protected def isExcludedByInterface(method: Method): Boolean = { - val excludedInterfaces = - Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil") - def toComparisionKey(method: Method) = - (method.getReturnType, method.getName, method.getGenericReturnType) - val interfaces = method.getDeclaringClass.getInterfaces.filter { i => - excludedInterfaces.contains(i.getName) - } - val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey)) - excludedMethods.contains(toComparisionKey(method)) - } - - /** * Utility to be called during the test. */ protected def checkMethods( @@ -66,26 +51,33 @@ abstract class ScalaAPICompletenessTestBase extends TestLogger { val javaMethods = javaClass.getMethods .filterNot(_.isAccessible) .filterNot(isExcludedByName) - .filterNot(isExcludedByInterface) .map(m => m.getName).toSet val scalaMethods = scalaClass.getMethods .filterNot(_.isAccessible) .filterNot(isExcludedByName) - .filterNot(isExcludedByInterface) .map(m => m.getName).toSet val missingMethods = javaMethods -- scalaMethods - for (method <- missingMethods) { - fail("Method " + method + " from " + javaClass + " is missing from " + scalaClassName + ".") + for (javaMethod <- missingMethods) { + // check if the method simply follows different getter / setter conventions in Scala / Java + // for example Java: getFoo() should match Scala: foo() + if (!containsScalaGetterLike(javaMethod, scalaMethods)) { + fail(s"Method $javaMethod from $javaClass is missing from $scalaClassName.") + } } } - protected def checkEquality(scalaInstance: AnyRef, extractJavaFun : ((AnyRef) => AnyRef)) { - val javaInstance = extractJavaFun(scalaInstance) + protected def containsScalaGetterLike(javaMethod: String, scalaMethods: Set[String]): Boolean = { + if (javaMethod.startsWith("get") && javaMethod.length >= 4) { + val scalaMethodName = Character.toLowerCase(javaMethod.charAt(3)) + javaMethod.substring(4) + scalaMethods.contains(scalaMethodName) + } else { + false + } } - + /** * Tests to be performed to ensure API completeness. */
