This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1da3055  [FLINK-13869][table-planner-blink][hive] Fix Hive functions 
can not work in blink planner streaming mode (#10013)
1da3055 is described below

commit 1da3055338e5f23a3ec4a54152dbf6b4e35e3862
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Oct 30 20:11:52 2019 +0800

    [FLINK-13869][table-planner-blink][hive] Fix Hive functions can not work in 
blink planner streaming mode (#10013)
---
 .../catalog/hive/HiveCatalogUseBlinkITCase.java    | 163 +++++++++++++--------
 .../planner/functions/utils/AggSqlFunction.scala   |   6 +-
 .../functions/utils/ScalarSqlFunction.scala        |   4 +
 .../planner/functions/utils/TableSqlFunction.scala |   3 +-
 .../table/planner/plan/utils/AggregateUtil.scala   |   2 +-
 5 files changed, 116 insertions(+), 62 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
index d610125..343c8c9 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogUseBlinkITCase.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.table.catalog.hive;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogFunctionImpl;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableBuilder;
@@ -35,7 +40,10 @@ import 
org.apache.flink.table.functions.hive.util.TestHiveGenericUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveSimpleUDF;
 import org.apache.flink.table.functions.hive.util.TestHiveUDTF;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
+import org.apache.flink.table.util.JavaScalaConversionUtil;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.FileUtils;
 
 import com.klarna.hiverunner.HiveShell;
@@ -54,6 +62,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -96,14 +105,6 @@ public class HiveCatalogUseBlinkITCase extends 
AbstractTestBase {
 
        @Test
        public void testBlinkUdf() throws Exception {
-               TableEnvironment tEnv = TableEnvironment.create(
-                               
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
-
-               BatchTestBase.configForMiniCluster(tEnv.getConfig());
-
-               tEnv.registerCatalog("myhive", hiveCatalog);
-               tEnv.useCatalog("myhive");
-
                TableSchema schema = TableSchema.builder()
                                .field("name", DataTypes.STRING())
                                .field("age", DataTypes.INT())
@@ -122,41 +123,12 @@ public class HiveCatalogUseBlinkITCase extends 
AbstractTestBase {
                                                .withComment("Comment.")
                                                .build();
 
-               Path p = Paths.get(tempFolder.newFolder().getAbsolutePath(), 
"test.csv");
-
-               TableSchema sinkSchema = TableSchema.builder()
-                               .field("name1", Types.STRING())
-                               .field("name2", Types.STRING())
-                               .field("sum1", Types.INT())
-                               .field("sum2", Types.LONG())
-                               .build();
-
-               FormatDescriptor sinkFormat = new OldCsv()
-                               .field("name1", Types.STRING())
-                               .field("name2", Types.STRING())
-                               .field("sum1", Types.INT())
-                               .field("sum2", Types.LONG());
-               CatalogTable sink =
-                               new CatalogTableBuilder(
-                                               new 
FileSystem().path(p.toAbsolutePath().toString()),
-                                               sinkSchema)
-                                               .withFormat(sinkFormat)
-                                               .inAppendMode()
-                                               .withComment("Comment.")
-                                               .build();
-
                hiveCatalog.createTable(
                                new ObjectPath(HiveCatalog.DEFAULT_DB, 
sourceTableName),
                                source,
                                false
                );
 
-               hiveCatalog.createTable(
-                               new ObjectPath(HiveCatalog.DEFAULT_DB, 
sinkTableName),
-                               sink,
-                               false
-               );
-
                hiveCatalog.createFunction(
                                new ObjectPath(HiveCatalog.DEFAULT_DB, "myudf"),
                                new 
CatalogFunctionImpl(TestHiveSimpleUDF.class.getCanonicalName(), new 
HashMap<>()),
@@ -174,34 +146,107 @@ public class HiveCatalogUseBlinkITCase extends 
AbstractTestBase {
                                new 
CatalogFunctionImpl(GenericUDAFSum.class.getCanonicalName(), new HashMap<>()),
                                false);
 
+               testUdf(true);
+               testUdf(false);
+       }
+
+       private void testUdf(boolean batch) throws Exception {
+               TableEnvironment tEnv;
+               EnvironmentSettings.Builder envBuilder = 
EnvironmentSettings.newInstance().useBlinkPlanner();
+               if (batch) {
+                       envBuilder.inBatchMode();
+               } else {
+                       envBuilder.inStreamingMode();
+               }
+               if (batch) {
+                       tEnv = TableEnvironment.create(envBuilder.build());
+               } else {
+                       tEnv = StreamTableEnvironment.create(
+                                       
StreamExecutionEnvironment.getExecutionEnvironment(), envBuilder.build());
+               }
+
+               BatchTestBase.configForMiniCluster(tEnv.getConfig());
+
+               tEnv.registerCatalog("myhive", hiveCatalog);
+               tEnv.useCatalog("myhive");
+
                String innerSql = format("select mygenericudf(myudf(name), 1) 
as a, mygenericudf(myudf(age), 1) as b," +
                                " s from %s, lateral table(myudtf(name, 1)) as 
T(s)", sourceTableName);
 
-               tEnv.sqlUpdate(
-                               format("insert into %s select a, s, sum(b), 
myudaf(b) from (%s) group by a, s",
-                                               sinkTableName,
-                                               innerSql));
-               tEnv.execute("myjob");
-
-               // assert written result
-               StringBuilder builder = new StringBuilder();
-               try (Stream<Path> paths = 
Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
-                       paths.filter(Files::isRegularFile).forEach(path -> {
-                               try {
-                                       String content = 
FileUtils.readFileUtf8(path.toFile());
-                                       if (content.isEmpty()) {
-                                               return;
+               String selectSql = format("select a, s, sum(b), myudaf(b) from 
(%s) group by a, s", innerSql);
+
+               List<String> results;
+               if (batch) {
+                       Path p = 
Paths.get(tempFolder.newFolder().getAbsolutePath(), "test.csv");
+
+                       TableSchema sinkSchema = TableSchema.builder()
+                                       .field("name1", Types.STRING())
+                                       .field("name2", Types.STRING())
+                                       .field("sum1", Types.INT())
+                                       .field("sum2", Types.LONG())
+                                       .build();
+
+                       FormatDescriptor sinkFormat = new OldCsv()
+                                       .field("name1", Types.STRING())
+                                       .field("name2", Types.STRING())
+                                       .field("sum1", Types.INT())
+                                       .field("sum2", Types.LONG());
+                       CatalogTable sink =
+                                       new CatalogTableBuilder(
+                                                       new 
FileSystem().path(p.toAbsolutePath().toString()),
+                                                       sinkSchema)
+                                                       .withFormat(sinkFormat)
+                                                       .inAppendMode()
+                                                       .withComment("Comment.")
+                                                       .build();
+
+                       hiveCatalog.createTable(
+                                       new ObjectPath(HiveCatalog.DEFAULT_DB, 
sinkTableName),
+                                       sink,
+                                       false
+                       );
+
+                       tEnv.sqlUpdate(format("insert into %s " + selectSql, 
sinkTableName));
+                       tEnv.execute("myjob");
+
+                       // assert written result
+                       StringBuilder builder = new StringBuilder();
+                       try (Stream<Path> paths = 
Files.walk(Paths.get(p.toAbsolutePath().toString()))) {
+                               paths.filter(Files::isRegularFile).forEach(path 
-> {
+                                       try {
+                                               String content = 
FileUtils.readFileUtf8(path.toFile());
+                                               if (content.isEmpty()) {
+                                                       return;
+                                               }
+                                               builder.append(content);
+                                       } catch (IOException e) {
+                                               throw new RuntimeException(e);
                                        }
-                                       builder.append(content);
-                               } catch (IOException e) {
-                                       throw new RuntimeException(e);
-                               }
-                       });
+                               });
+                       }
+                       results = Arrays.stream(builder.toString().split("\n"))
+                                       .filter(s -> !s.isEmpty())
+                                       .collect(Collectors.toList());
+               } else {
+                       StreamTableEnvironment streamTEnv = 
(StreamTableEnvironment) tEnv;
+                       TestingRetractSink sink = new TestingRetractSink();
+                       streamTEnv.toRetractStream(tEnv.sqlQuery(selectSql), 
Row.class)
+                                       .map(new JavaToScala())
+                                       .addSink((SinkFunction) sink);
+                       streamTEnv.execute("");
+                       results = 
JavaScalaConversionUtil.toJava(sink.getRetractResults());
                }
-               List<String> results = 
Arrays.stream(builder.toString().split("\n"))
-                               .filter(s -> !s.isEmpty())
-                               .collect(Collectors.toList());
+
+               results = new ArrayList<>(results);
                results.sort(String::compareTo);
                Assert.assertEquals(Arrays.asList("1,1,2,2", "2,2,4,4", 
"3,3,6,6"), results);
        }
+
+       private static class JavaToScala implements MapFunction<Tuple2<Boolean, 
Row>, scala.Tuple2<Boolean, Row>> {
+
+               @Override
+               public scala.Tuple2<Boolean, Row> map(Tuple2<Boolean, Row> 
value) throws Exception {
+                       return new scala.Tuple2<>(value.f0, value.f1);
+               }
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
index 7effbba..5213929 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/AggSqlFunction.scala
@@ -51,7 +51,7 @@ import java.util
 class AggSqlFunction(
     name: String,
     displayName: String,
-    aggregateFunction: UserDefinedAggregateFunction[_, _],
+    val aggregateFunction: UserDefinedAggregateFunction[_, _],
     val externalResultType: DataType,
     val externalAccType: DataType,
     typeFactory: FlinkTypeFactory,
@@ -72,6 +72,10 @@ class AggSqlFunction(
     typeFactory
   ) {
 
+  /**
+    * This is temporary solution for hive udf and should be removed once 
FLIP-65 is finished,
+    * please pass the non-null input arguments.
+    */
   def makeFunction(
       constants: Array[AnyRef],
       argTypes: Array[LogicalType]): UserDefinedAggregateFunction[_, _] = 
aggregateFunction
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
index 8e5609c..f036cbe 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/ScalarSqlFunction.scala
@@ -58,6 +58,10 @@ class ScalarSqlFunction(
     null,
     SqlFunctionCategory.USER_DEFINED_FUNCTION) {
 
+  /**
+    * This is temporary solution for hive udf and should be removed once 
FLIP-65 is finished,
+    * please pass the non-null input arguments.
+    */
   def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): 
ScalarFunction =
     scalarFunction
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index c3f5ac3..4cb1d53 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -67,7 +67,8 @@ class TableSqlFunction(
     functionImpl) {
 
   /**
-    * Get the user-defined table function.
+    * This is temporary solution for hive udf and should be removed once 
FLIP-65 is finished,
+    * please pass the non-null input arguments.
     */
   def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): 
TableFunction[_] =
     udtf
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
index 92a5d3d..973a6da 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala
@@ -749,7 +749,7 @@ object AggregateUtil extends Enumeration {
   private[flink] def isTableAggregate(aggCalls: util.List[AggregateCall]): 
Boolean = {
     aggCalls
       .filter(e => e.getAggregation.isInstanceOf[AggSqlFunction])
-      .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].makeFunction(null, null))
+      .map(e => 
e.getAggregation.asInstanceOf[AggSqlFunction].aggregateFunction)
       .exists(_.isInstanceOf[TableAggregateFunction[_, _]])
   }
 }

Reply via email to