[hotfix] Move DataStreamUtils to the datastream API package so that we can 
actually use it to expose package-private constructors or methods for 
experimental features.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfe6f84c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfe6f84c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfe6f84c

Branch: refs/heads/master
Commit: bfe6f84cf1b99f1dfa801a1818fa51ccc7817c9b
Parents: dea4172
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Feb 7 11:04:14 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Feb 9 18:23:37 2018 +0100

----------------------------------------------------------------------
 .../api/datastream/DataStreamUtils.java         | 115 +++++++++++++++++++
 .../streaming/experimental/CollectSink.java     |   2 +-
 .../streaming/experimental/DataStreamUtils.java | 115 -------------------
 .../experimental/SocketStreamIterator.java      |   6 +-
 .../streaming/api/scala/DataStreamUtils.scala   |  47 ++++++++
 .../experimental/scala/DataStreamUtils.scala    |  48 --------
 .../streaming/experimental/CollectITCase.java   |   2 +-
 7 files changed, 168 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
new file mode 100644
index 0000000..d145d6f
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+
+/**
+ * A collection of experimental utilities for {@link DataStream DataStreams}.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
+ * for more information.
+ */
+@PublicEvolving
+public final class DataStreamUtils {
+
+       /**
+        * Returns an iterator to iterate over the elements of the DataStream.
+        * @return The iterator
+        */
+       public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) 
throws IOException {
+
+               TypeSerializer<OUT> serializer = 
stream.getType().createSerializer(
+                               stream.getExecutionEnvironment().getConfig());
+
+               SocketStreamIterator<OUT> iter = new 
SocketStreamIterator<OUT>(serializer);
+
+               //Find out what IP of us should be given to CollectSink, that 
it will be able to connect to
+               StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
+               InetAddress clientAddress;
+
+               if (env instanceof RemoteStreamEnvironment) {
+                       String host = ((RemoteStreamEnvironment) env).getHost();
+                       int port = ((RemoteStreamEnvironment) env).getPort();
+                       try {
+                               clientAddress = 
ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 
400);
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Could not determine an 
suitable network address to " +
+                                               "receive back data from the 
streaming program.", e);
+                       }
+               } else if (env instanceof LocalStreamEnvironment) {
+                       clientAddress = InetAddress.getLoopbackAddress();
+               } else {
+                       try {
+                               clientAddress = InetAddress.getLocalHost();
+                       } catch (UnknownHostException e) {
+                               throw new IOException("Could not determine this 
machines own local address to " +
+                                               "receive back data from the 
streaming program.", e);
+                       }
+               }
+
+               DataStreamSink<OUT> sink = stream.addSink(new 
CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
+               sink.setParallelism(1); // It would not work if multiple 
instances would connect to the same port
+
+               (new CallExecute(env, iter)).start();
+
+               return iter;
+       }
+
+       private static class CallExecute extends Thread {
+
+               private final StreamExecutionEnvironment toTrigger;
+               private final SocketStreamIterator<?> toNotify;
+
+               private CallExecute(StreamExecutionEnvironment toTrigger, 
SocketStreamIterator<?> toNotify) {
+                       this.toTrigger = toTrigger;
+                       this.toNotify = toNotify;
+               }
+
+               @Override
+               public void run(){
+                       try {
+                               toTrigger.execute();
+                       }
+                       catch (Throwable t) {
+                               toNotify.notifyOfError(t);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Private constructor to prevent instantiation.
+        */
+       private DataStreamUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
index 23b5280..aa0b53b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -35,7 +35,7 @@ import java.net.Socket;
  * for more information.
  */
 @Internal
-class CollectSink<IN> extends RichSinkFunction<IN> {
+public class CollectSink<IN> extends RichSinkFunction<IN> {
 
        private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
deleted file mode 100644
index 59ad6a8..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.experimental;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.net.ConnectionUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-/**
- * A collection of utilities for {@link DataStream DataStreams}.
- *
- * <p>This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
- * for more information.
- */
-@PublicEvolving
-public final class DataStreamUtils {
-
-       /**
-        * Returns an iterator to iterate over the elements of the DataStream.
-        * @return The iterator
-        */
-       public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) 
throws IOException {
-
-               TypeSerializer<OUT> serializer = 
stream.getType().createSerializer(
-                               stream.getExecutionEnvironment().getConfig());
-
-               SocketStreamIterator<OUT> iter = new 
SocketStreamIterator<OUT>(serializer);
-
-               //Find out what IP of us should be given to CollectSink, that 
it will be able to connect to
-               StreamExecutionEnvironment env = 
stream.getExecutionEnvironment();
-               InetAddress clientAddress;
-
-               if (env instanceof RemoteStreamEnvironment) {
-                       String host = ((RemoteStreamEnvironment) env).getHost();
-                       int port = ((RemoteStreamEnvironment) env).getPort();
-                       try {
-                               clientAddress = 
ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port), 2000, 
400);
-                       }
-                       catch (Exception e) {
-                               throw new IOException("Could not determine an 
suitable network address to " +
-                                               "receive back data from the 
streaming program.", e);
-                       }
-               } else if (env instanceof LocalStreamEnvironment) {
-                       clientAddress = InetAddress.getLoopbackAddress();
-               } else {
-                       try {
-                               clientAddress = InetAddress.getLocalHost();
-                       } catch (UnknownHostException e) {
-                               throw new IOException("Could not determine this 
machines own local address to " +
-                                               "receive back data from the 
streaming program.", e);
-                       }
-               }
-
-               DataStreamSink<OUT> sink = stream.addSink(new 
CollectSink<OUT>(clientAddress, iter.getPort(), serializer));
-               sink.setParallelism(1); // It would not work if multiple 
instances would connect to the same port
-
-               (new CallExecute(env, iter)).start();
-
-               return iter;
-       }
-
-       private static class CallExecute extends Thread {
-
-               private final StreamExecutionEnvironment toTrigger;
-               private final SocketStreamIterator<?> toNotify;
-
-               private CallExecute(StreamExecutionEnvironment toTrigger, 
SocketStreamIterator<?> toNotify) {
-                       this.toTrigger = toTrigger;
-                       this.toNotify = toNotify;
-               }
-
-               @Override
-               public void run(){
-                       try {
-                               toTrigger.execute();
-                       }
-                       catch (Throwable t) {
-                               toNotify.notifyOfError(t);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Private constructor to prevent instantiation.
-        */
-       private DataStreamUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 871c0f7..ccb54ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -42,7 +43,7 @@ import java.util.NoSuchElementException;
  * @param <T> The type of elements returned from the iterator.
  */
 @PublicEvolving
-class SocketStreamIterator<T> implements Iterator<T> {
+public class SocketStreamIterator<T> implements Iterator<T> {
 
        /** Server socket to listen at. */
        private final ServerSocket socket;
@@ -62,7 +63,8 @@ class SocketStreamIterator<T> implements Iterator<T> {
        /** Async error, for example by the executor of the program that 
produces the stream. */
        private volatile Throwable error;
 
-       SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
+       @Internal
+       public SocketStreamIterator(TypeSerializer<T> serializer) throws 
IOException {
                this.serializer = serializer;
                try {
                        socket = new ServerSocket(0, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
new file mode 100644
index 0000000..74dd66a
--- /dev/null
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStreamUtils => 
JavaStreamUtils}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+  * This class provides simple utility methods for collecting a [[DataStream]],
+  * effectively enriching it with the functionality encapsulated by 
[[DataStreamUtils]].
+  *
+  * This experimental class is relocated from flink-streaming-contrib.
+  *
+  * @param self DataStream
+  */
+@PublicEvolving
+class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
+
+  /**
+    * Returns a scala iterator to iterate over the elements of the DataStream.
+    * @return The iterator
+    */
+  def collect() : Iterator[T] = {
+    JavaStreamUtils.collect(self.javaStream).asScala
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
deleted file mode 100644
index 8c4beff..0000000
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.experimental.scala
-
-import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.experimental.{DataStreamUtils => 
JavaStreamUtils}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-/**
-  * This class provides simple utility methods for collecting a [[DataStream]],
-  * effectively enriching it with the functionality encapsulated by 
[[DataStreamUtils]].
-  *
-  * This experimental class is relocated from flink-streaming-contrib.
-  *
-  * @param self DataStream
-  */
-@PublicEvolving
-class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
-
-  /**
-    * Returns a scala iterator to iterate over the elements of the DataStream.
-    * @return The iterator
-    */
-  def collect() : Iterator[T] = {
-    JavaStreamUtils.collect(self.javaStream).asScala
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
index ad07390..0535bf7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.streaming.experimental;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.experimental.DataStreamUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
 

Reply via email to