Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/incubator-flink/pull/72#discussion_r15466553
--- Diff:
flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/DataStream.java
---
@@ -0,0 +1,700 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.functions.FilterFunction;
+import org.apache.flink.api.java.functions.FlatMapFunction;
+import org.apache.flink.api.java.functions.GroupReduceFunction;
+import org.apache.flink.api.java.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import
org.apache.flink.streaming.api.StreamExecutionEnvironment.ConnectionType;
+import org.apache.flink.streaming.api.collector.OutputSelector;
+import org.apache.flink.streaming.api.function.co.CoMapFunction;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsCsv;
+import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
+import
org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
+import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+import
org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+
+/**
+ * A DataStream represents a stream of elements of the same type. A
DataStream
+ * can be transformed into another DataStream by applying a transformation
as
+ * for example
+ * <ul>
+ * <li>{@link DataStream#map},</li>
+ * <li>{@link DataStream#filter}, or</li>
+ * <li>{@link DataStream#batchReduce}.</li>
+ * </ul>
+ *
+ * @param <T>
+ * The type of the DataStream, i.e., the type of the elements
of the
+ * DataStream.
+ */
+public class DataStream<T extends Tuple> {
+
+ protected static Integer counter = 0;
+ protected final StreamExecutionEnvironment environment;
+ protected String id;
+ protected int degreeOfParallelism;
+ protected String userDefinedName;
+ protected OutputSelector<T> outputSelector;
+ protected List<String> connectIDs;
+ protected List<ConnectionType> ctypes;
+ protected List<Integer> cparams;
+ protected boolean iterationflag;
+ protected Integer iterationID;
+
+ /**
+ * Create a new {@link DataStream} in the given execution environment
with
+ * partitioning set to shuffle by default.
+ *
+ * @param environment
+ * StreamExecutionEnvironment
+ * @param operatorType
+ * The type of the operator in the component
+ */
+ protected DataStream(StreamExecutionEnvironment environment, String
operatorType) {
+ if (environment == null) {
+ throw new NullPointerException("context is null");
+ }
+
+ // TODO add name based on component number an preferable
sequential id
+ counter++;
+ this.id = operatorType + "-" + counter.toString();
+ this.environment = environment;
+ this.degreeOfParallelism = environment.getDegreeOfParallelism();
+ initConnections();
+
+ }
+
+ /**
+ * Create a new DataStream by creating a copy of another DataStream
+ *
+ * @param dataStream
+ * The DataStream that will be copied.
+ */
+ protected DataStream(DataStream<T> dataStream) {
+ this.environment = dataStream.environment;
+ this.id = dataStream.id;
+ this.degreeOfParallelism = dataStream.degreeOfParallelism;
+ this.userDefinedName = dataStream.userDefinedName;
+ this.outputSelector = dataStream.outputSelector;
+ this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
+ this.ctypes = new
ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
+ this.cparams = new ArrayList<Integer>(dataStream.cparams);
+ this.iterationflag = dataStream.iterationflag;
+ this.iterationID = dataStream.iterationID;
+ }
+
+ /**
+ * Initialize the connection and partitioning among the connected
+ * {@link DataStream}s.
+ */
+ private void initConnections() {
+ connectIDs = new ArrayList<String>();
+ connectIDs.add(getId());
+ ctypes = new
ArrayList<StreamExecutionEnvironment.ConnectionType>();
+ ctypes.add(ConnectionType.SHUFFLE);
+ cparams = new ArrayList<Integer>();
+ cparams.add(0);
+
+ }
+
+ /**
+ * Returns the ID of the {@link DataStream}.
+ *
+ * @return ID of the DataStream
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Sets the mutability of the operator represented by the DataStream.
If the
+ * operator is set to mutable, the tuples received in the user defined
+ * functions, will be reused after the function call. Setting an
operator to
+ * mutable greatly reduces garbage collection overhead and thus
scalability.
+ *
+ * @param isMutable
+ * The mutability of the operator.
+ * @return The DataStream with mutability set.
+ */
+ public DataStream<T> setMutability(boolean isMutable) {
+ environment.setMutability(this, isMutable);
+ return this;
+ }
+
+ /**
+ * Sets the degree of parallelism for this operator. The degree must be
1 or
+ * more.
+ *
+ * @param dop
+ * The degree of parallelism for this operator.
+ * @return The operator with set degree of parallelism.
+ */
+ public DataStream<T> setParallelism(int dop) {
+ if (dop < 1) {
+ throw new IllegalArgumentException("The parallelism of
an operator must be at least 1.");
+ }
+ this.degreeOfParallelism = dop;
+
+ environment.setOperatorParallelism(this);
+
+ return new DataStream<T>(this);
+
+ }
+
+ /**
+ * Gets the degree of parallelism for this operator.
+ *
+ * @return The parallelism set for this operator.
+ */
+ public int getParallelism() {
+ return this.degreeOfParallelism;
+ }
+
+ /**
+ * Gives the data transformation(vertex) a user defined name in order
to use
+ * at directed outputs. The {@link OutputSelector} of the input vertex
+ * should use this name for directed emits.
+ *
+ * @param name
+ * The name to set
+ * @return The named DataStream.
+ */
+ public DataStream<T> name(String name) {
+ // copy?
+ if (name == "") {
--- End diff --
This is afaik not a check for an empty string. It checks if the reference
of `name` is the same as the "".
I think `name.equals("")` is correct here.
http://stackoverflow.com/questions/3321526/should-i-use-string-isempty-or-equalsstring
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---