[FLINK-4265] [dataset api] Add a NoOpOperator

Adds a NoOpOperator which is unwound in OperatorTranslation.translate.
This will be first used by Gelly as a placeholder to support implicit
operator reuse.

This closes #2294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66d4b872
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66d4b872
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66d4b872

Branch: refs/heads/flip-6
Commit: 66d4b8724b9e9b09225d2bbd3132dc2efdcf843a
Parents: cab76f6
Author: Greg Hogan <[email protected]>
Authored: Mon Jul 25 14:05:56 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Tue Sep 6 09:25:48 2016 -0400

----------------------------------------------------------------------
 .../flink/api/java/operators/NoOpOperator.java  | 51 ++++++++++++++++++++
 .../api/java/operators/OperatorTranslation.java | 20 ++++----
 2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66d4b872/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
new file mode 100644
index 0000000..369ab9e
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/NoOpOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This operator will be ignored during translation.
+ *
+ * @param <IN> The type of the data set passed through the operator.
+ */
+@Internal
+public class NoOpOperator<IN> extends DataSet<IN> {
+
+       private DataSet<IN> input;
+
+       public NoOpOperator(DataSet<IN> input, TypeInformation<IN> resultType) {
+               super(input.getExecutionEnvironment(), resultType);
+
+               this.input = input;
+       }
+
+       public DataSet<IN> getInput() {
+               return input;
+       }
+
+       public void setInput(DataSet<IN> input) {
+               Preconditions.checkNotNull(input);
+
+               this.input = input;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/66d4b872/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
index 74811a3..3f44d58 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java
@@ -19,15 +19,8 @@
 package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.AbstractUdfOperator;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
@@ -35,8 +28,14 @@ import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.BulkIterationBase;
 import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @Internal
 public class OperatorTranslation {
        
@@ -70,7 +69,10 @@ public class OperatorTranslation {
        
        
        private <T> Operator<T> translate(DataSet<T> dataSet) {
-               
+               while (dataSet instanceof NoOpOperator) {
+                       dataSet = ((NoOpOperator<T>) dataSet).getInput();
+               }
+
                // check if we have already translated that data set (operation 
or source)
                Operator<?> previous = (Operator<?>) 
this.translated.get(dataSet);
                if (previous != null) {

Reply via email to