[ 
https://issues.apache.org/jira/browse/FLINK-11074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715934#comment-16715934
 ] 

ASF GitHub Bot commented on FLINK-11074:
----------------------------------------

dianfu commented on a change in pull request #7253: [FLINK-11074] 
[table][tests] Enable harness tests with RocksdbStateBackend and add harness 
tests for CollectAggFunction
URL: https://github.com/apache/flink/pull/7253#discussion_r240445604
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ##########
 @@ -491,13 +489,75 @@ class HarnessTestBase {
     distinctCountFuncName,
     distinctCountAggCode)
 
+  def createHarnessTester[KEY, IN, OUT](
+      dataStream: DataStream[_],
+      prefixOperatorName: String)
+  : KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT] = {
+
+    val transformation = extractExpectedTransformation(
+      dataStream.javaStream.getTransformation,
+      prefixOperatorName).asInstanceOf[OneInputTransformation[_, _]]
+    if (transformation == null) {
+      throw new Exception("Can not find the expected transformation")
+    }
+
+    val processOperator = 
transformation.getOperator.asInstanceOf[OneInputStreamOperator[IN, OUT]]
+    val keySelector = 
transformation.getStateKeySelector.asInstanceOf[KeySelector[IN, KEY]]
+    val keyType = 
transformation.getStateKeyType.asInstanceOf[TypeInformation[KEY]]
+
+    createHarnessTester(processOperator, keySelector, keyType)
+      .asInstanceOf[KeyedOneInputStreamOperatorTestHarness[KEY, IN, OUT]]
+  }
+
+  private def extractExpectedTransformation(
+      transformation: StreamTransformation[_],
+      prefixOperatorName: String): StreamTransformation[_] = {
+    def extractFromInputs(inputs: StreamTransformation[_]*): 
StreamTransformation[_] = {
+      for (input <- inputs) {
+        val t = extractExpectedTransformation(input, prefixOperatorName)
+        if (t != null) {
+          return t
+        }
+      }
+      null
+    }
+
+    transformation match {
+      case one: OneInputTransformation[_, _] =>
+        if (one.getName.startsWith(prefixOperatorName)) {
+          one
+        } else {
+          extractExpectedTransformation(one.getInput, prefixOperatorName)
+        }
+      case two: TwoInputTransformation[_, _, _] =>
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Improve the harness test to make it possible test with state backend
> --------------------------------------------------------------------
>
>                 Key: FLINK-11074
>                 URL: https://issues.apache.org/jira/browse/FLINK-11074
>             Project: Flink
>          Issue Type: Test
>          Components: Table API &amp; SQL
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, the harness test can only test without state backend. If you use a 
> DataView in the accumulator of the aggregate function, the DataView is a java 
> object and held in heap, not replaced with StateMapView/StateListView which 
> values are actually held in the state backend. We should improve the harness 
> test to make it possible to test with state backend. Otherwise, issues such 
> as FLINK-10674 could have never been found. With this harness test available, 
> we could test the built-in aggregate functions which use the DataView more 
> fine grained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to