Move delta window functions to package functions.windowing.delta

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be2dc1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be2dc1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be2dc1a

Branch: refs/heads/master
Commit: 3be2dc1aaaeae4cbbfaecaf4998a64f1199260eb
Parents: 6610cae
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Sep 25 11:34:10 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Sep 28 17:04:17 2015 +0200

----------------------------------------------------------------------
 .../windowing/delta/CosineDistance.java         | 92 ++++++++++++++++++++
 .../windowing/delta/DeltaFunction.java          | 44 ++++++++++
 .../windowing/delta/EuclideanDistance.java      | 58 ++++++++++++
 .../delta/ExtractionAwareDeltaFunction.java     | 90 +++++++++++++++++++
 .../windowing/deltafunction/CosineDistance.java | 92 --------------------
 .../windowing/deltafunction/DeltaFunction.java  | 44 ----------
 .../deltafunction/EuclideanDistance.java        | 58 ------------
 .../ExtractionAwareDeltaFunction.java           | 90 -------------------
 .../streaming/api/windowing/helper/Delta.java   |  2 +-
 .../api/windowing/policy/DeltaPolicy.java       |  2 +-
 .../api/complex/ComplexIntegrationTest.java     |  2 +-
 .../deltafunction/CosineDistanceTest.java       |  1 +
 .../deltafunction/EuclideanDistanceTest.java    |  1 +
 .../api/windowing/policy/DeltaPolicyTest.java   |  4 +-
 .../examples/windowing/TopSpeedWindowing.java   |  2 +-
 .../streaming/api/scala/windowing/Delta.scala   |  2 +-
 16 files changed, 293 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
new file mode 100644
index 0000000..7859b2c
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the cosine distance between two given 
vectors.
+ * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
+ * 
+ * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, 
double[]> {
+
+       /**
+        * auto-generated id
+        */
+       private static final long serialVersionUID = -1217813582965151599L;
+
+       public CosineDistance() {
+               super(null);
+       }
+
+       public CosineDistance(Extractor<DATA, double[]> converter) {
+               super(converter);
+       }
+
+       @Override
+       public double getNestedDelta(double[] oldDataPoint, double[] 
newDataPoint) {
+               if (isNullvector(oldDataPoint, newDataPoint)) {
+                       return 0;
+               }
+
+               if (oldDataPoint.length != newDataPoint.length) {
+                       throw new IllegalArgumentException(
+                                       "The size of two input arrays are not 
same, can not compute cosine distance");
+               }
+
+               double sum1 = 0;
+               double sum2 = 0;
+               for (int i = 0; i < oldDataPoint.length; i++) {
+                       sum1 += oldDataPoint[i] * oldDataPoint[i];
+                       sum2 += newDataPoint[i] * newDataPoint[i];
+               }
+               sum1 = Math.sqrt(sum1);
+               sum2 = Math.sqrt(sum2);
+
+               return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * 
sum2));
+       }
+
+       private double dotProduct(double[] a, double[] b) {
+               double result = 0;
+               for (int i = 0; i < a.length; i++) {
+                       result += a[i] * b[i];
+               }
+               return result;
+       }
+
+       private boolean isNullvector(double[]... vectors) {
+               outer: for (double[] v : vectors) {
+                       for (double field : v) {
+                               if (field != 0) {
+                                       continue outer;
+                               }
+                       }
+                       // This position is only reached in case all fields are 
0.
+                       return true;
+               }
+               return false;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
new file mode 100644
index 0000000..0ce2bf9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import java.io.Serializable;
+
+/**
+ * This interface allows the implementation of a function which calculates the
+ * delta between two data points. Delta functions might be used in delta
+ * policies and allow flexible adaptive windowing based on the arriving data
+ * points.
+ *
+ * @param <DATA>
+ *            The type of input data which can be compared using this function.
+ */
+public interface DeltaFunction<DATA> extends Serializable {
+
+       /**
+        * Calculates the delta between two given data points.
+        * 
+        * @param oldDataPoint
+        *            the old data point.
+        * @param newDataPoint
+        *            the new data point.
+        * @return the delta between the two given points.
+        */
+       public double getDelta(DATA oldDataPoint, DATA newDataPoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
new file mode 100644
index 0000000..f9e8ec7
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * This delta function calculates the euclidean distance between two given
+ * points.
+ * 
+ * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
+ * 
+ * @param <DATA>
+ *            The input data type. This delta function works with a double[],
+ *            but can extract/convert to it from any other given object in case
+ *            the respective extractor has been set. See
+ *            {@link ExtractionAwareDeltaFunction} for more information.
+ */
+public class EuclideanDistance<DATA> extends 
ExtractionAwareDeltaFunction<DATA, double[]> {
+
+       public EuclideanDistance() {
+               super(null);
+       }
+
+       public EuclideanDistance(Extractor<DATA, double[]> converter) {
+               super(converter);
+       }
+
+       /**
+        * auto-generated version id
+        */
+       private static final long serialVersionUID = 3119432599634512359L;
+
+       @Override
+       public double getNestedDelta(double[] oldDataPoint, double[] 
newDataPoint) {
+               double result = 0;
+               for (int i = 0; i < oldDataPoint.length; i++) {
+                       result += (oldDataPoint[i] - newDataPoint[i]) * 
(oldDataPoint[i] - newDataPoint[i]);
+               }
+               return Math.sqrt(result);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
new file mode 100644
index 0000000..bd5b0b9
--- /dev/null
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing.delta;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * Extend this abstract class to implement a delta function which is aware of
+ * extracting the data on which the delta is calculated from a more complex 
data
+ * structure. For example in case you want to be able to run a delta only on 
one
+ * field of a Tuple type or only on some fields from an array.
+ * 
+ * @param <DATA>
+ *            The input data type. The input of this type will be passed to the
+ *            extractor which will transform into a TO-object. The delta
+ *            function then runs on this TO-object.
+ * @param <TO>
+ *            The type on which the delta function runs. (The type of the delta
+ *            function)
+ */
+public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements 
DeltaFunction<DATA> {
+
+       /**
+        * Generated Version ID
+        */
+       private static final long serialVersionUID = 6927486219702689554L;
+       private Extractor<DATA, TO> converter;
+
+       public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
+               this.converter = converter;
+       }
+
+       /**
+        * This method takes the two data point and runs the set extractor on 
it.
+        * The delta function implemented at {@link #getNestedDelta} is then 
called
+        * with the extracted data. In case no extractor is set the input data 
gets
+        * passes to {@link #getNestedDelta} as-is. The return value is just
+        * forwarded from {@link #getNestedDelta}.
+        * 
+        * @param oldDataPoint
+        *            the older data point as raw data (before extraction).
+        * @param newDataPoint
+        *            the new data point as raw data (before extraction).
+        * @return the delta between the two points.
+        */
+       @SuppressWarnings("unchecked")
+       @Override
+       public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
+               if (converter == null) {
+                       // In case no conversion/extraction is required, we can 
cast DATA to
+                       // TO
+                       // => Therefore, "unchecked" warning is suppressed for 
this method.
+                       return getNestedDelta((TO) oldDataPoint, (TO) 
newDataPoint);
+               } else {
+                       return getNestedDelta(converter.extract(oldDataPoint), 
converter.extract(newDataPoint));
+               }
+
+       }
+
+       /**
+        * This method is exactly the same as
+        * {@link DeltaFunction#getDelta(Object, Object)} except that it gets 
the
+        * result of the previously done extractions as input. Therefore, this
+        * method only does the actual calculation of the delta but no data
+        * extraction or conversion.
+        * 
+        * @param oldDataPoint
+        *            the older data point.
+        * @param newDataPoint
+        *            the new data point.
+        * @return the delta between the two points.
+        */
+       public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
deleted file mode 100644
index 77486d0..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the cosine distance between two given 
vectors.
- * The cosine distance is defined as: cosineDistance=1-cosineSimilarity
- * 
- * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, 
double[]> {
-
-       /**
-        * auto-generated id
-        */
-       private static final long serialVersionUID = -1217813582965151599L;
-
-       public CosineDistance() {
-               super(null);
-       }
-
-       public CosineDistance(Extractor<DATA, double[]> converter) {
-               super(converter);
-       }
-
-       @Override
-       public double getNestedDelta(double[] oldDataPoint, double[] 
newDataPoint) {
-               if (isNullvector(oldDataPoint, newDataPoint)) {
-                       return 0;
-               }
-
-               if (oldDataPoint.length != newDataPoint.length) {
-                       throw new IllegalArgumentException(
-                                       "The size of two input arrays are not 
same, can not compute cosine distance");
-               }
-
-               double sum1 = 0;
-               double sum2 = 0;
-               for (int i = 0; i < oldDataPoint.length; i++) {
-                       sum1 += oldDataPoint[i] * oldDataPoint[i];
-                       sum2 += newDataPoint[i] * newDataPoint[i];
-               }
-               sum1 = Math.sqrt(sum1);
-               sum2 = Math.sqrt(sum2);
-
-               return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * 
sum2));
-       }
-
-       private double dotProduct(double[] a, double[] b) {
-               double result = 0;
-               for (int i = 0; i < a.length; i++) {
-                       result += a[i] * b[i];
-               }
-               return result;
-       }
-
-       private boolean isNullvector(double[]... vectors) {
-               outer: for (double[] v : vectors) {
-                       for (double field : v) {
-                               if (field != 0) {
-                                       continue outer;
-                               }
-                       }
-                       // This position is only reached in case all fields are 
0.
-                       return true;
-               }
-               return false;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
deleted file mode 100644
index b2223d6..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import java.io.Serializable;
-
-/**
- * This interface allows the implementation of a function which calculates the
- * delta between two data points. Delta functions might be used in delta
- * policies and allow flexible adaptive windowing based on the arriving data
- * points.
- *
- * @param <DATA>
- *            The type of input data which can be compared using this function.
- */
-public interface DeltaFunction<DATA> extends Serializable {
-
-       /**
-        * Calculates the delta between two given data points.
-        * 
-        * @param oldDataPoint
-        *            the old data point.
-        * @param newDataPoint
-        *            the new data point.
-        * @return the delta between the two given points.
-        */
-       public double getDelta(DATA oldDataPoint, DATA newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
deleted file mode 100644
index 9d055d5..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * This delta function calculates the euclidean distance between two given
- * points.
- * 
- * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance
- * 
- * @param <DATA>
- *            The input data type. This delta function works with a double[],
- *            but can extract/convert to it from any other given object in case
- *            the respective extractor has been set. See
- *            {@link ExtractionAwareDeltaFunction} for more information.
- */
-public class EuclideanDistance<DATA> extends 
ExtractionAwareDeltaFunction<DATA, double[]> {
-
-       public EuclideanDistance() {
-               super(null);
-       }
-
-       public EuclideanDistance(Extractor<DATA, double[]> converter) {
-               super(converter);
-       }
-
-       /**
-        * auto-generated version id
-        */
-       private static final long serialVersionUID = 3119432599634512359L;
-
-       @Override
-       public double getNestedDelta(double[] oldDataPoint, double[] 
newDataPoint) {
-               double result = 0;
-               for (int i = 0; i < oldDataPoint.length; i++) {
-                       result += (oldDataPoint[i] - newDataPoint[i]) * 
(oldDataPoint[i] - newDataPoint[i]);
-               }
-               return Math.sqrt(result);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
deleted file mode 100644
index 3e9f2ca..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.deltafunction;
-
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-
-/**
- * Extend this abstract class to implement a delta function which is aware of
- * extracting the data on which the delta is calculated from a more complex 
data
- * structure. For example in case you want to be able to run a delta only on 
one
- * field of a Tuple type or only on some fields from an array.
- * 
- * @param <DATA>
- *            The input data type. The input of this type will be passed to the
- *            extractor which will transform into a TO-object. The delta
- *            function then runs on this TO-object.
- * @param <TO>
- *            The type on which the delta function runs. (The type of the delta
- *            function)
- */
-public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements 
DeltaFunction<DATA> {
-
-       /**
-        * Generated Version ID
-        */
-       private static final long serialVersionUID = 6927486219702689554L;
-       private Extractor<DATA, TO> converter;
-
-       public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
-               this.converter = converter;
-       }
-
-       /**
-        * This method takes the two data point and runs the set extractor on 
it.
-        * The delta function implemented at {@link #getNestedDelta} is then 
called
-        * with the extracted data. In case no extractor is set the input data 
gets
-        * passes to {@link #getNestedDelta} as-is. The return value is just
-        * forwarded from {@link #getNestedDelta}.
-        * 
-        * @param oldDataPoint
-        *            the older data point as raw data (before extraction).
-        * @param newDataPoint
-        *            the new data point as raw data (before extraction).
-        * @return the delta between the two points.
-        */
-       @SuppressWarnings("unchecked")
-       @Override
-       public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
-               if (converter == null) {
-                       // In case no conversion/extraction is required, we can 
cast DATA to
-                       // TO
-                       // => Therefore, "unchecked" warning is suppressed for 
this method.
-                       return getNestedDelta((TO) oldDataPoint, (TO) 
newDataPoint);
-               } else {
-                       return getNestedDelta(converter.extract(oldDataPoint), 
converter.extract(newDataPoint));
-               }
-
-       }
-
-       /**
-        * This method is exactly the same as
-        * {@link DeltaFunction#getDelta(Object, Object)} except that it gets 
the
-        * result of the previously done extractions as input. Therefore, this
-        * method only does the actual calculation of the delta but no data
-        * extraction or conversion.
-        * 
-        * @param oldDataPoint
-        *            the older data point.
-        * @param newDataPoint
-        *            the new data point.
-        * @return the delta between the two points.
-        */
-       public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 255049d..31063ab 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.windowing.helper;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 0583176..0b6a493 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 
 /**
  * This policy calculates a delta between the data point which triggered last

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
index f2c253c..abc1a18 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java
@@ -37,7 +37,7 @@ import 
org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Count;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
index e12b254..bdc7e94 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.api.windowing.deltafunction;
 
 import static org.junit.Assert.*;
 
+import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance;
 import org.junit.Test;
 
 public class CosineDistanceTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
index 8c62497..85a0882 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.api.windowing.deltafunction;
 
 import static org.junit.Assert.*;
 
+import 
org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance;
 import org.junit.Test;
 
 public class EuclideanDistanceTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
index 448377d..9ec4644 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.junit.Test;
 
 import java.util.List;
@@ -85,4 +85,4 @@ public class DeltaPolicyTest {
                                0, 0), 3, SERIALIZER));
 
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index e48b437..1b48387 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.helper.Delta;
 import org.apache.flink.streaming.api.windowing.helper.Time;
 import org.apache.flink.streaming.api.windowing.helper.Timestamp;

http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
index f490726..461ad3c 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.scala.windowing
 
 import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta }
-import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction
 
 object Delta {
 

Reply via email to