[ https://issues.apache.org/jira/browse/FLINK-21001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-21001: ----------------------------------- Labels: auto-deprioritized-major stale-minor (was: auto-deprioritized-major) I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help the community manage its development. I see this issues has been marked as Minor but is unassigned and neither itself nor its Sub-Tasks have been updated for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is still Minor, please either assign yourself or give an update. Afterwards, please remove the label or in 7 days the issue will be deprioritized. > Flink job is blocked while using tableEnvironment with tableFunction and join > ----------------------------------------------------------------------------- > > Key: FLINK-21001 > URL: https://issues.apache.org/jira/browse/FLINK-21001 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.11.2 > Environment: flink-11.2 > Reporter: Wu > Priority: Minor > Labels: auto-deprioritized-major, stale-minor > Attachments: client_log.txt > > > The code is as follow. > {code:java} > //代码占位符 > package com.oppo.recdata.datapipe; > import com.oppo.recdata.datapipe.flink.transform.ExplodeDataTypeEnum; > import com.oppo.recdata.datapipe.flink.transform.ExplodeModify; > import com.oppo.recdata.datapipe.flink.transform.TableExplode; > import > com.oppo.recdata.datapipe.flink.transform.function.CollectMapAggregateFunction; > import org.apache.flink.table.api.*; > import static org.apache.flink.table.api.Expressions.row; > /** > * @author wujianz...@oppo.com > */ > public class BatchTable { > public static void main(String[] args) { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > ExplodeModify modify = new ExplodeModify(ExplodeDataTypeEnum.string, > null, "&"); > tableEnv.createTemporarySystemFunction("explode", new > TableExplode(modify)); > tableEnv.createFunction("collect_map", > CollectMapAggregateFunction.class); > Table table = tableEnv.fromValues( > DataTypes.ROW( > DataTypes.FIELD("buuid", DataTypes.STRING()), > DataTypes.FIELD("docType", DataTypes.INT()), > DataTypes.FIELD("viewTime", DataTypes.INT()), > DataTypes.FIELD("subCategory", DataTypes.STRING()) > ), > row("John", "1", "36", "NBA&football") > ); > tableEnv.createTemporaryView("feeds_expose_click_profile", table); > Table add_profile = tableEnv.sqlQuery("select buuid, cast(docType as > varchar) as docType, viewTime, subCategory from feeds_expose_click_profile > where buuid is not null and docType is not null and viewTime > 0"); > tableEnv.createTemporaryView("add_profile", add_profile); > Table cate2Click = tableEnv.sqlQuery("select buuid, docType, > viewTime, cate2 from add_profile, LATERAL TABLE(explode(subCategory)) as > t(cate2) where subCategory is not null"); > tableEnv.createTemporaryView("cate2_click", cate2Click); > Table cate2_detail = tableEnv.sqlQuery("select cate2, sum(viewTime) > as viewTimeSum, buuid, docType from cate2_click GROUP BY buuid, cate2, > docType"); > tableEnv.createTemporaryView("user_cate2_detail", cate2_detail); > Table user_global_cate2 = tableEnv.sqlQuery("select > 'gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, buuid > as keyName, docType from cate2_click group by buuid, docType"); > tableEnv.createTemporaryView("user_global_cate2", user_global_cate2); > Table global_user_cate2 = tableEnv.sqlQuery("select cate2 as > fieldName, sum(viewTime) as fieldValue, 'guser_cate2_24h_click_sumtime' as > keyName, docType from cate2_click group by cate2, docType "); > tableEnv.createTemporaryView("global_user_cate2",global_user_cate2); > Table global_user_global_cate2 = tableEnv.sqlQuery("select > 'guser_gcate2_24h_click_sumtime' as fieldName, sum(viewTime) as fieldValue, > 'global_feature' as keyName, docType from cate2_click group by docType"); > tableEnv.createTemporaryView("global_user_global_cate2", > global_user_global_cate2); > Table cate2_cs_detail = tableEnv.sqlQuery("select a.cate2 as > fieldName, (a.viewTimeSum + 0.2) / (b.fieldValue * c.fieldValue / > d.fieldValue + 0.2) as fieldValue, a.buuid as keyName, a.docType from > user_cate2_detail a join user_global_cate2 b on a.buuid = b.keyName and > a.docType = b.docType join global_user_cate2 c on a.cate2 = c.fieldName and > a.docType = c.docType join global_user_global_cate2 d on a.docType = > d.docType where a.viewTimeSum > 0 and b.fieldValue > 0 and c.fieldValue > 0 > and d.fieldValue > 0"); > tableEnv.createTemporaryView("cate2_cs_detail", cate2_cs_detail); > Table cate2Cs = tableEnv.sqlQuery("select 'cate2_24h_click_sumtimeds' > as fieldName, collect_map(fieldName, ROUND(fieldValue, 5)) as fieldValue, > concat(docType, '#', keyName) as keyName from cate2_cs_detail where > fieldValue < 0 or fieldValue >= 0 group by keyName, docType"); > cate2Cs.execute().print(); > } > } > {code} > The client log is as follow. > {code:java} > //代码占位符 > "C:\Program Files\Java\jdk1.8.0_73\bin\java.exe" "-javaagent:D:\Program > Files\JetBrains\IntelliJ IDEA 2018.2.5\lib\idea_rt.jar=64196:D:\Program > Files\JetBrains\IntelliJ IDEA 2018.2.5\bin" -Dfile.encoding=UTF-8 -classpath > C:\Users\80242151\AppData\Local\Temp\classpath403316789.jar > com.oppo.recdata.datapipe.BatchTable > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.16/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.5/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/D:/lib/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.8.2/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/D:/lib/repository/org/slf4j/slf4j-log4j12/1.7.15/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2021-01-17 15:05:25,639 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > 2021-01-17 15:05:25,645 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > 2021-01-17 15:05:25,652 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > 2021-01-17 15:05:25,653 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > 2021-01-17 15:05:25,656 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > 2021-01-17 15:05:25,656 [main] INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.apache.flink.types.Row is missing a default constructor so it cannot be > used as a POJO type and must be processed as GenericType. Please read the > Flink documentation on "Data Types & Serialization" for details of the effect > on performance. > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)