Move delta window functions to package functions.windowing.delta
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be2dc1a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be2dc1a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be2dc1a Branch: refs/heads/master Commit: 3be2dc1aaaeae4cbbfaecaf4998a64f1199260eb Parents: 6610cae Author: Aljoscha Krettek <[email protected]> Authored: Fri Sep 25 11:34:10 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Sep 28 17:04:17 2015 +0200 ---------------------------------------------------------------------- .../windowing/delta/CosineDistance.java | 92 ++++++++++++++++++++ .../windowing/delta/DeltaFunction.java | 44 ++++++++++ .../windowing/delta/EuclideanDistance.java | 58 ++++++++++++ .../delta/ExtractionAwareDeltaFunction.java | 90 +++++++++++++++++++ .../windowing/deltafunction/CosineDistance.java | 92 -------------------- .../windowing/deltafunction/DeltaFunction.java | 44 ---------- .../deltafunction/EuclideanDistance.java | 58 ------------ .../ExtractionAwareDeltaFunction.java | 90 ------------------- .../streaming/api/windowing/helper/Delta.java | 2 +- .../api/windowing/policy/DeltaPolicy.java | 2 +- .../api/complex/ComplexIntegrationTest.java | 2 +- .../deltafunction/CosineDistanceTest.java | 1 + .../deltafunction/EuclideanDistanceTest.java | 1 + .../api/windowing/policy/DeltaPolicyTest.java | 4 +- .../examples/windowing/TopSpeedWindowing.java | 2 +- .../streaming/api/scala/windowing/Delta.scala | 2 +- 16 files changed, 293 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java new file mode 100644 index 0000000..7859b2c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/CosineDistance.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing.delta; + +import org.apache.flink.streaming.api.windowing.extractor.Extractor; + +/** + * This delta function calculates the cosine distance between two given vectors. + * The cosine distance is defined as: cosineDistance=1-cosineSimilarity + * + * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity + * + * @param <DATA> + * The input data type. This delta function works with a double[], + * but can extract/convert to it from any other given object in case + * the respective extractor has been set. See + * {@link ExtractionAwareDeltaFunction} for more information. + */ +public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> { + + /** + * auto-generated id + */ + private static final long serialVersionUID = -1217813582965151599L; + + public CosineDistance() { + super(null); + } + + public CosineDistance(Extractor<DATA, double[]> converter) { + super(converter); + } + + @Override + public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) { + if (isNullvector(oldDataPoint, newDataPoint)) { + return 0; + } + + if (oldDataPoint.length != newDataPoint.length) { + throw new IllegalArgumentException( + "The size of two input arrays are not same, can not compute cosine distance"); + } + + double sum1 = 0; + double sum2 = 0; + for (int i = 0; i < oldDataPoint.length; i++) { + sum1 += oldDataPoint[i] * oldDataPoint[i]; + sum2 += newDataPoint[i] * newDataPoint[i]; + } + sum1 = Math.sqrt(sum1); + sum2 = Math.sqrt(sum2); + + return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2)); + } + + private double dotProduct(double[] a, double[] b) { + double result = 0; + for (int i = 0; i < a.length; i++) { + result += a[i] * b[i]; + } + return result; + } + + private boolean isNullvector(double[]... vectors) { + outer: for (double[] v : vectors) { + for (double field : v) { + if (field != 0) { + continue outer; + } + } + // This position is only reached in case all fields are 0. + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java new file mode 100644 index 0000000..0ce2bf9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/DeltaFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing.delta; + +import java.io.Serializable; + +/** + * This interface allows the implementation of a function which calculates the + * delta between two data points. Delta functions might be used in delta + * policies and allow flexible adaptive windowing based on the arriving data + * points. + * + * @param <DATA> + * The type of input data which can be compared using this function. + */ +public interface DeltaFunction<DATA> extends Serializable { + + /** + * Calculates the delta between two given data points. + * + * @param oldDataPoint + * the old data point. + * @param newDataPoint + * the new data point. + * @return the delta between the two given points. + */ + public double getDelta(DATA oldDataPoint, DATA newDataPoint); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java new file mode 100644 index 0000000..f9e8ec7 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/EuclideanDistance.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing.delta; + +import org.apache.flink.streaming.api.windowing.extractor.Extractor; + +/** + * This delta function calculates the euclidean distance between two given + * points. + * + * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance + * + * @param <DATA> + * The input data type. This delta function works with a double[], + * but can extract/convert to it from any other given object in case + * the respective extractor has been set. See + * {@link ExtractionAwareDeltaFunction} for more information. + */ +public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> { + + public EuclideanDistance() { + super(null); + } + + public EuclideanDistance(Extractor<DATA, double[]> converter) { + super(converter); + } + + /** + * auto-generated version id + */ + private static final long serialVersionUID = 3119432599634512359L; + + @Override + public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) { + double result = 0; + for (int i = 0; i < oldDataPoint.length; i++) { + result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]); + } + return Math.sqrt(result); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java new file mode 100644 index 0000000..bd5b0b9 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/windowing/delta/ExtractionAwareDeltaFunction.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.windowing.delta; + +import org.apache.flink.streaming.api.windowing.extractor.Extractor; + +/** + * Extend this abstract class to implement a delta function which is aware of + * extracting the data on which the delta is calculated from a more complex data + * structure. For example in case you want to be able to run a delta only on one + * field of a Tuple type or only on some fields from an array. + * + * @param <DATA> + * The input data type. The input of this type will be passed to the + * extractor which will transform into a TO-object. The delta + * function then runs on this TO-object. + * @param <TO> + * The type on which the delta function runs. (The type of the delta + * function) + */ +public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> { + + /** + * Generated Version ID + */ + private static final long serialVersionUID = 6927486219702689554L; + private Extractor<DATA, TO> converter; + + public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) { + this.converter = converter; + } + + /** + * This method takes the two data point and runs the set extractor on it. + * The delta function implemented at {@link #getNestedDelta} is then called + * with the extracted data. In case no extractor is set the input data gets + * passes to {@link #getNestedDelta} as-is. The return value is just + * forwarded from {@link #getNestedDelta}. + * + * @param oldDataPoint + * the older data point as raw data (before extraction). + * @param newDataPoint + * the new data point as raw data (before extraction). + * @return the delta between the two points. + */ + @SuppressWarnings("unchecked") + @Override + public double getDelta(DATA oldDataPoint, DATA newDataPoint) { + if (converter == null) { + // In case no conversion/extraction is required, we can cast DATA to + // TO + // => Therefore, "unchecked" warning is suppressed for this method. + return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint); + } else { + return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint)); + } + + } + + /** + * This method is exactly the same as + * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the + * result of the previously done extractions as input. Therefore, this + * method only does the actual calculation of the delta but no data + * extraction or conversion. + * + * @param oldDataPoint + * the older data point. + * @param newDataPoint + * the new data point. + * @return the delta between the two points. + */ + public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java deleted file mode 100644 index 77486d0..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistance.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.deltafunction; - -import org.apache.flink.streaming.api.windowing.extractor.Extractor; - -/** - * This delta function calculates the cosine distance between two given vectors. - * The cosine distance is defined as: cosineDistance=1-cosineSimilarity - * - * Cosine similarity: http://en.wikipedia.org/wiki/Cosine_similarity - * - * @param <DATA> - * The input data type. This delta function works with a double[], - * but can extract/convert to it from any other given object in case - * the respective extractor has been set. See - * {@link ExtractionAwareDeltaFunction} for more information. - */ -public class CosineDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> { - - /** - * auto-generated id - */ - private static final long serialVersionUID = -1217813582965151599L; - - public CosineDistance() { - super(null); - } - - public CosineDistance(Extractor<DATA, double[]> converter) { - super(converter); - } - - @Override - public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) { - if (isNullvector(oldDataPoint, newDataPoint)) { - return 0; - } - - if (oldDataPoint.length != newDataPoint.length) { - throw new IllegalArgumentException( - "The size of two input arrays are not same, can not compute cosine distance"); - } - - double sum1 = 0; - double sum2 = 0; - for (int i = 0; i < oldDataPoint.length; i++) { - sum1 += oldDataPoint[i] * oldDataPoint[i]; - sum2 += newDataPoint[i] * newDataPoint[i]; - } - sum1 = Math.sqrt(sum1); - sum2 = Math.sqrt(sum2); - - return 1d - (dotProduct(oldDataPoint, newDataPoint) / (sum1 * sum2)); - } - - private double dotProduct(double[] a, double[] b) { - double result = 0; - for (int i = 0; i < a.length; i++) { - result += a[i] * b[i]; - } - return result; - } - - private boolean isNullvector(double[]... vectors) { - outer: for (double[] v : vectors) { - for (double field : v) { - if (field != 0) { - continue outer; - } - } - // This position is only reached in case all fields are 0. - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java deleted file mode 100644 index b2223d6..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/DeltaFunction.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.deltafunction; - -import java.io.Serializable; - -/** - * This interface allows the implementation of a function which calculates the - * delta between two data points. Delta functions might be used in delta - * policies and allow flexible adaptive windowing based on the arriving data - * points. - * - * @param <DATA> - * The type of input data which can be compared using this function. - */ -public interface DeltaFunction<DATA> extends Serializable { - - /** - * Calculates the delta between two given data points. - * - * @param oldDataPoint - * the old data point. - * @param newDataPoint - * the new data point. - * @return the delta between the two given points. - */ - public double getDelta(DATA oldDataPoint, DATA newDataPoint); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java deleted file mode 100644 index 9d055d5..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistance.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.deltafunction; - -import org.apache.flink.streaming.api.windowing.extractor.Extractor; - -/** - * This delta function calculates the euclidean distance between two given - * points. - * - * Euclidean distance: http://en.wikipedia.org/wiki/Euclidean_distance - * - * @param <DATA> - * The input data type. This delta function works with a double[], - * but can extract/convert to it from any other given object in case - * the respective extractor has been set. See - * {@link ExtractionAwareDeltaFunction} for more information. - */ -public class EuclideanDistance<DATA> extends ExtractionAwareDeltaFunction<DATA, double[]> { - - public EuclideanDistance() { - super(null); - } - - public EuclideanDistance(Extractor<DATA, double[]> converter) { - super(converter); - } - - /** - * auto-generated version id - */ - private static final long serialVersionUID = 3119432599634512359L; - - @Override - public double getNestedDelta(double[] oldDataPoint, double[] newDataPoint) { - double result = 0; - for (int i = 0; i < oldDataPoint.length; i++) { - result += (oldDataPoint[i] - newDataPoint[i]) * (oldDataPoint[i] - newDataPoint[i]); - } - return Math.sqrt(result); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java deleted file mode 100644 index 3e9f2ca..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.windowing.deltafunction; - -import org.apache.flink.streaming.api.windowing.extractor.Extractor; - -/** - * Extend this abstract class to implement a delta function which is aware of - * extracting the data on which the delta is calculated from a more complex data - * structure. For example in case you want to be able to run a delta only on one - * field of a Tuple type or only on some fields from an array. - * - * @param <DATA> - * The input data type. The input of this type will be passed to the - * extractor which will transform into a TO-object. The delta - * function then runs on this TO-object. - * @param <TO> - * The type on which the delta function runs. (The type of the delta - * function) - */ -public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> { - - /** - * Generated Version ID - */ - private static final long serialVersionUID = 6927486219702689554L; - private Extractor<DATA, TO> converter; - - public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) { - this.converter = converter; - } - - /** - * This method takes the two data point and runs the set extractor on it. - * The delta function implemented at {@link #getNestedDelta} is then called - * with the extracted data. In case no extractor is set the input data gets - * passes to {@link #getNestedDelta} as-is. The return value is just - * forwarded from {@link #getNestedDelta}. - * - * @param oldDataPoint - * the older data point as raw data (before extraction). - * @param newDataPoint - * the new data point as raw data (before extraction). - * @return the delta between the two points. - */ - @SuppressWarnings("unchecked") - @Override - public double getDelta(DATA oldDataPoint, DATA newDataPoint) { - if (converter == null) { - // In case no conversion/extraction is required, we can cast DATA to - // TO - // => Therefore, "unchecked" warning is suppressed for this method. - return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint); - } else { - return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint)); - } - - } - - /** - * This method is exactly the same as - * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the - * result of the previously done extractions as input. Therefore, this - * method only does the actual calculation of the delta but no data - * extraction or conversion. - * - * @param oldDataPoint - * the older data point. - * @param newDataPoint - * the new data point. - * @return the delta between the two points. - */ - public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java index 255049d..31063ab 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.windowing.helper; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.policy.DeltaPolicy; import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy; import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy; http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java index 0583176..0b6a493 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; /** * This policy calculates a delta between the data point which triggered last http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java index f2c253c..abc1a18 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/complex/ComplexIntegrationTest.java @@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.helper.Count; import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java index e12b254..bdc7e94 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/CosineDistanceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.deltafunction; import static org.junit.Assert.*; +import org.apache.flink.streaming.api.functions.windowing.delta.CosineDistance; import org.junit.Test; public class CosineDistanceTest { http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java index 8c62497..85a0882 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/deltafunction/EuclideanDistanceTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.windowing.deltafunction; import static org.junit.Assert.*; +import org.apache.flink.streaming.api.functions.windowing.delta.EuclideanDistance; import org.junit.Test; public class EuclideanDistanceTest { http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java index 448377d..9ec4644 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicyTest.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.windowing.policy; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.junit.Test; import java.util.List; @@ -85,4 +85,4 @@ public class DeltaPolicyTest { 0, 0), 3, SERIALIZER)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java index e48b437..1b48387 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java @@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction; +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction; import org.apache.flink.streaming.api.windowing.helper.Delta; import org.apache.flink.streaming.api.windowing.helper.Time; import org.apache.flink.streaming.api.windowing.helper.Timestamp; http://git-wip-us.apache.org/repos/asf/flink/blob/3be2dc1a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala index f490726..461ad3c 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/windowing/Delta.scala @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.scala.windowing import org.apache.flink.api.scala.ClosureCleaner +import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction import org.apache.flink.streaming.api.windowing.helper.{ Delta => JavaDelta } -import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction object Delta {
