[FLINK-8550][table] Iterate over entryset instead of keys

This closes #5404.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e41eaab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e41eaab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e41eaab

Branch: refs/heads/master
Commit: 5e41eaab9c9feff743ca79ddbee11704ceaa2b2d
Parents: d5d584e
Author: hequn8128 <[email protected]>
Authored: Fri Feb 2 17:33:50 2018 +0800
Committer: zentol <[email protected]>
Committed: Tue Feb 6 20:20:47 2018 +0100

----------------------------------------------------------------------
 .../table/runtime/aggregate/ProcTimeBoundedRangeOver.scala  | 7 ++++---
 .../table/runtime/aggregate/RowTimeBoundedRangeOver.scala   | 9 +++++----
 .../table/runtime/utils/JavaUserDefinedAggFunctions.java    | 9 ++++++---
 .../flink/table/utils/UserDefinedTableFunctions.scala       | 9 +++++----
 4 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
index 1d947a0..e00c7ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
@@ -147,13 +147,14 @@ class ProcTimeBoundedRangeOver(
     // when we find timestamps that are out of interest, we retrieve 
corresponding elements
     // and eliminate them. Multiple elements could have been received at the 
same timestamp
     // the removal of old elements happens only once per proctime as onTimer 
is called only once
-    val iter = rowMapState.keys.iterator
+    val iter = rowMapState.iterator
     val markToRemove = new ArrayList[Long]()
     while (iter.hasNext) {
-      val elementKey = iter.next
+      val entry = iter.next()
+      val elementKey = entry.getKey
       if (elementKey < limit) {
         // element key outside of window. Retract values
-        val elementsRemove = rowMapState.get(elementKey)
+        val elementsRemove = entry.getValue
         var iRemove = 0
         while (iRemove < elementsRemove.size()) {
           val retractRow = elementsRemove.get(iRemove)

http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
index 85c523e..b13acdf 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala
@@ -192,12 +192,13 @@ class RowTimeBoundedRangeOver(
       val retractTsList: JList[Long] = new JArrayList[Long]
 
       // do retraction
-      val dataTimestampIt = dataState.keys.iterator
-      while (dataTimestampIt.hasNext) {
-        val dataTs: Long = dataTimestampIt.next()
+      val iter = dataState.iterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val dataTs: Long = entry.getKey
         val offset = timestamp - dataTs
         if (offset > precedingOffset) {
-          val retractDataList = dataState.get(dataTs)
+          val retractDataList = entry.getValue
           dataListIndex = 0
           while (dataListIndex < retractDataList.size()) {
             val retractRow = retractDataList.get(dataListIndex)

http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
index abf2c49..0483c40 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.dataview.MapView;
 import org.apache.flink.table.functions.AggregateFunction;
 
 import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Test aggregator functions.
@@ -223,10 +224,12 @@ public class JavaUserDefinedAggFunctions {
                                acc.count += mergeAcc.count;
 
                                try {
-                                       Iterator<String> itrMap = 
mergeAcc.map.keys().iterator();
+                                       Iterator itrMap = 
mergeAcc.map.iterator();
                                        while (itrMap.hasNext()) {
-                                               String key = itrMap.next();
-                                               Integer cnt = 
mergeAcc.map.get(key);
+                                               Map.Entry<String, Integer> 
entry =
+                                                               
(Map.Entry<String, Integer>) itrMap.next();
+                                               String key = entry.getKey();
+                                               Integer cnt = entry.getValue();
                                                if (acc.map.contains(key)) {
                                                        acc.map.put(key, 
acc.map.get(key) + cnt);
                                                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/5e41eaab/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index 9060db5..1d8b504 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -85,10 +85,11 @@ class TableFunc3(data: String, conf: Map[String, String]) 
extends TableFunction[
       val splits = user.split("#")
       if (null != data) {
         if (null != conf && conf.size > 0) {
-          val it = conf.keys.iterator
-          while (it.hasNext) {
-            val key = it.next()
-            val value = conf.get(key).get
+          val iter = conf.iterator
+          while (iter.hasNext) {
+            val entry = iter.next()
+            val key = entry._1
+            val value = entry._2
             collect(
               SimpleUser(
                 data.concat("_key=")

Reply via email to