[FLINK-8550][table] Iterate over entryset instead of keys This closes #5404.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e41eaab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e41eaab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e41eaab Branch: refs/heads/master Commit: 5e41eaab9c9feff743ca79ddbee11704ceaa2b2d Parents: d5d584e Author: hequn8128 <[email protected]> Authored: Fri Feb 2 17:33:50 2018 +0800 Committer: zentol <[email protected]> Committed: Tue Feb 6 20:20:47 2018 +0100 ---------------------------------------------------------------------- .../table/runtime/aggregate/ProcTimeBoundedRangeOver.scala | 7 ++++--- .../table/runtime/aggregate/RowTimeBoundedRangeOver.scala | 9 +++++---- .../table/runtime/utils/JavaUserDefinedAggFunctions.java | 9 ++++++--- .../flink/table/utils/UserDefinedTableFunctions.scala | 9 +++++---- 4 files changed, 20 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index 1d947a0..e00c7ac 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -147,13 +147,14 @@ class ProcTimeBoundedRangeOver( // when we find timestamps that are out of interest, we retrieve corresponding elements // and eliminate them. Multiple elements could have been received at the same timestamp // the removal of old elements happens only once per proctime as onTimer is called only once - val iter = rowMapState.keys.iterator + val iter = rowMapState.iterator val markToRemove = new ArrayList[Long]() while (iter.hasNext) { - val elementKey = iter.next + val entry = iter.next() + val elementKey = entry.getKey if (elementKey < limit) { // element key outside of window. Retract values - val elementsRemove = rowMapState.get(elementKey) + val elementsRemove = entry.getValue var iRemove = 0 while (iRemove < elementsRemove.size()) { val retractRow = elementsRemove.get(iRemove) http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index 85c523e..b13acdf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -192,12 +192,13 @@ class RowTimeBoundedRangeOver( val retractTsList: JList[Long] = new JArrayList[Long] // do retraction - val dataTimestampIt = dataState.keys.iterator - while (dataTimestampIt.hasNext) { - val dataTs: Long = dataTimestampIt.next() + val iter = dataState.iterator() + while (iter.hasNext) { + val entry = iter.next() + val dataTs: Long = entry.getKey val offset = timestamp - dataTs if (offset > precedingOffset) { - val retractDataList = dataState.get(dataTs) + val retractDataList = entry.getValue dataListIndex = 0 while (dataListIndex < retractDataList.size()) { val retractRow = retractDataList.get(dataListIndex) http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java index abf2c49..0483c40 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.dataview.MapView; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; +import java.util.Map; /** * Test aggregator functions. @@ -223,10 +224,12 @@ public class JavaUserDefinedAggFunctions { acc.count += mergeAcc.count; try { - Iterator<String> itrMap = mergeAcc.map.keys().iterator(); + Iterator itrMap = mergeAcc.map.iterator(); while (itrMap.hasNext()) { - String key = itrMap.next(); - Integer cnt = mergeAcc.map.get(key); + Map.Entry<String, Integer> entry = + (Map.Entry<String, Integer>) itrMap.next(); + String key = entry.getKey(); + Integer cnt = entry.getValue(); if (acc.map.contains(key)) { acc.map.put(key, acc.map.get(key) + cnt); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala index 9060db5..1d8b504 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala @@ -85,10 +85,11 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[ val splits = user.split("#") if (null != data) { if (null != conf && conf.size > 0) { - val it = conf.keys.iterator - while (it.hasNext) { - val key = it.next() - val value = conf.get(key).get + val iter = conf.iterator + while (iter.hasNext) { + val entry = iter.next() + val key = entry._1 + val value = entry._2 collect( SimpleUser( data.concat("_key=")
