Re: Using Hive UDFs

2021-04-27 Thread 김영우
Thanks Shengkai and Rui for looking into this. A snippet from my app. looks like following: HiveCatalog hive = *new* HiveCatalog("flink-hive", "default", "/tmp/hive"); tableEnv.registerCatalog("flink-hive", hive); tableEnv.useCatalog("flink-hive");

Re: Using Hive UDFs

2021-04-27 Thread Rui Li
Hi Youngwoo, The catalog function is associated with a catalog and DB. Assuming you have created the function ST_Point in your metastore, could you verify whether the current catalog is your HiveCatalog and the current database is the database in which ST_Point is registered? On Wed, Apr 28,

Re: Using Hive UDFs

2021-04-27 Thread Shengkai Fang
Hi. The order of the module may influence the load of the function. [1] https://issues.apache.org/jira/browse/FLINK-22383 Youngwoo Kim (김영우) 于2021年4月28日周三 上午10:50写道: > Hi, > > I've configured Hive metastore to use HiveCatalog in streaming > application. So far, most of the features are

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-27 Thread Shengkai Fang
hi, gen. 近期内应该就会发布,应该是五一左右就会发布1.13的版本。 Best, Shengkai gen 于2021年4月27日周二 下午8:57写道: > hi, Shengkai > 非常感谢你的解答, 解决了困扰我几天的问题。 > 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。 > 我之前使用的版本是 1.12.2。 > > > 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。 > > > > -- > Sent from:

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Thanks a lot~ > 2021年4月28日 上午8:25,Yik San Chan 写道: > > Hi Dian, > > I follow up with this PR https://github.com/apache/flink/pull/15790 > > On Tue, Apr 27, 2021 at 11:03 PM Dian Fu > wrote: > Hi Yik San, > > Make

回复:Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-27 Thread guoyb
我的也有这种问题,没解决,kerberos认证的hive导致的。 ---原始邮件--- 发件人: "张锴"

Using Hive UDFs

2021-04-27 Thread 김영우
Hi, I've configured Hive metastore to use HiveCatalog in streaming application. So far, most of the features are working fine in hive integration. However, I have a problem in using Hive UDFs. Already done prerequisites to use Hive geospatial UDFs[1] To sanity check, I did run a query like

Fwd: flink1.12.2 CLI连接hive出现异常

2021-04-27 Thread 张锴
-- Forwarded message - 发件人: 张锴 Date: 2021年4月27日周二 下午1:59 Subject: flink1.12.2 CLI连接hive出现异常 To: *使用flink1.12.2 CLI连接hive时,可以查看到catalog,也能看到指定的catalog下的tables,但是执行select 语句时就出现异常。* [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.ipc.RemoteException:

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian, I follow up with this PR https://github.com/apache/flink/pull/15790 On Tue, Apr 27, 2021 at 11:03 PM Dian Fu wrote: > Hi Yik San, > > Make sense to me. :) > > Regards, > Dian > > 2021年4月27日 下午9:50,Yik San Chan 写道: > > Hi Dian, > > Wow, this is unexpected  How about adding

Exception handling

2021-04-27 Thread Jacob Sevart
How do we get uncaught exceptions in operators to skip the problematic messages, rather than crash the entire job? Is there an easier or less mistake-prone way to do this than wrapping every operator method in try/catch? And what to do about Map? Since it has to return something, we're either

Taskmanager killed often after migrating to flink 1.12

2021-04-27 Thread Sambaran
Hi there, We have recently migrated to flink 1.12 from 1.7, although the jobs are running fine, sometimes the task manager is getting killed (much frequently 2-3 times a day). Logs: INFO org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down

Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

2021-04-27 Thread Igal Shilman
Hello! Your analysis is correct, indeed what is passed is whatever is being handed to withMessageBody(..). Starting with StateFun 3.0, if you need to send a message to a remote function the message needs to be a TypedValue. You can create an instance of TypedValue manually, or you can add a

Re: Custom metrics in Stateful Functions

2021-04-27 Thread Igal Shilman
Hello Cliff, You are right, indeed defining custom metrics is not supported at the moment. I will file a JIRA issue so we can track this, and we will try to prioritize this feature up. Meanwhile, there are a lot of metrics that StateFun defines, like invocations rates etc' perhaps you can find it

[Stateful Functions] Help for calling remote stateful function (written in Python)

2021-04-27 Thread Bonino Dario
Dear List, I am trying to call a sample stateful function defined in Python, using the Stateful Function Python SDK, from a Flink pipeline. I am building upon the examples provided for the  SDK for Flink DataStream Integration but I am currently stuck on a type cast issue that I am not able

Custom metrics in Stateful Functions

2021-04-27 Thread Cliff Resnick
We think Embedded Statefun is a nicer fit than Datastream for some problem domains, but one thing we miss is support for custom metrics/counters. Is there a way to access the Flink support? It looks like if we want custom metrics we'll need to roll our own.

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Hi Yik San, Make sense to me. :) Regards, Dian > 2021年4月27日 下午9:50,Yik San Chan 写道: > > Hi Dian, > > Wow, this is unexpected  How about adding documentations to Python UDF about > this? Again it can be time consuming to figure this out. Maybe: > > To be able to run Python UDFs in any

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian, Wow, this is unexpected  How about adding documentations to Python UDF about this? Again it can be time consuming to figure this out. Maybe: To be able to run Python UDFs in any non-local mode, it is recommended to include your UDF definitions using -pyfs config option, if your UDFs

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle to serialize the Python UDF. For the latter case, I guess the whole Python UDF implementation will be serialized. However, for the previous case, only the path of the class is serialized. Regards, Dian > 2021年4月27日

回复:解析kafka 非标准JSON问题

2021-04-27 Thread guoyb
好的,谢谢!我都试试看 ---原始邮件--- 发件人: "JasonLee"<17610775...@163.com 发送时间: 2021年4月27日(周二) 晚上7:14 收件人: "user-zh"http://apache-flink.147419.n8.nabble.com/

Re: Log rollover for logs.

2021-04-27 Thread Nicolaus Weidner
Hi John, On Tue, Apr 27, 2021 at 9:47 AM John Smith wrote: > Hi, I'm running flink as a systemd service with... > > [Service] > Type=forking > WorkingDirectory=/opt/flink > User=flink > Group=flink > ExecStart=/opt/flink/bin/taskmanager.sh start > ExecStop=/opt/flink/bin/taskmanager.sh stop >

Re: flink sql 使用自定义函数 ,返回嵌套行, 查询报错 scala.MatchError

2021-04-27 Thread gen
hi, Shengkai 非常感谢你的解答, 解决了困扰我几天的问题。 按照你的建议 ,我使用 今天(2021-4-27) 主干版本,运行正常,发现确实是已经修复的。 我之前使用的版本是 1.12.2。 目前最新的release版本是1.12.2 ,应该还没有包含这个修复。不知道你是否了解 1.13的发布计划。 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi Dian, Thanks! Adding -pyfs definitely helps. However, I am curious. If I define my udf this way: ```python @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def decrypt(s): import pandas as pd d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, index_col=

Re: ModuleNotFound when loading udf from another py file

2021-04-27 Thread Dian Fu
Hi Yik San, From the exception message, it’s clear that it could not find module `decrypt_fun` during execution. You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during

ModuleNotFound when loading udf from another py file

2021-04-27 Thread Yik San Chan
Hi, Here's the reproducible code sample: https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 I implement my Python UDF by extending the ScalarFunction base class in a separate file named decrypt_fun.py, and try to import the udf into my main python file

Re: 回复:解析kafka 非标准JSON问题

2021-04-27 Thread JasonLee
hi SQL 可以定义一个字段然后分隔再去获取 JSON 数据 或者可以自定义 UDF 去处理 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

回复:解析kafka 非标准JSON问题

2021-04-27 Thread guoyb
SQL能实现吗? ---原始邮件--- 发件人: "19971028...@163.com"<19971028...@163.com 发送时间: 2021年4月27日(周二) 晚上6:51 收件人: "user-zh"

Re: 解析kafka 非标准JSON问题

2021-04-27 Thread 19971028...@163.com
可以尝试在kafka 写个拦截器处理json 19971028...@163.com 发件人: guoyb 发送时间: 2021-04-27 17:55 收件人: user-zh 主题: 解析kafka 非标准JSON问题 您好!请问遇到如下格式的该如何用SQL解析。 string {name=string} {id : 1, name : abram} eg: 2021-04-03x{name=aa} {id : 1, name : abram} 最重要为第三个字段 JSON,里面包含了data 第一和第二个字段可要可不要

解析kafka 非标准JSON问题

2021-04-27 Thread guoyb
您好!请问遇到如下格式的该如何用SQL解析。 string {name=string} {id : 1, name : abram} eg: 2021-04-03x{name=aa} {id : 1, name : abram} 最重要为第三个字段 JSON,里面包含了data 第一和第二个字段可要可不要 请问,有思路可以参考吗?或者demo

Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi Dian, Thank you for the detailed answer! Best, Yik San On Tue, Apr 27, 2021 at 5:42 PM Dian Fu wrote: > Hi Yik San, > > Command line option `-pyarch` could be used to specify archive files such > as Python virtual environment, ML model, data file, etc. > > So for resources.zip, -pyarch

Re: Confusing docs on python.archives

2021-04-27 Thread Dian Fu
Thank you a lot~ > 2021年4月27日 下午5:38,Yik San Chan 写道: > > Hi Dian, > > As a follow-up, I fix the docs here > https://github.com/apache/flink/pull/15783 > > > Best, > Yik San > > On Tue, Apr 27, 2021 at 10:20 AM Dian Fu

Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Dian Fu
Hi Yik San, Command line option `-pyarch` could be used to specify archive files such as Python virtual environment, ML model, data file, etc. So for resources.zip, -pyarch makes more sense than -pyfs. Regards, Dian > 2021年4月27日 下午5:14,Yik San Chan 写道: > > Hi Dian, > > Thank you! That

Re: Confusing docs on python.archives

2021-04-27 Thread Yik San Chan
Hi Dian, As a follow-up, I fix the docs here https://github.com/apache/flink/pull/15783 Best, Yik San On Tue, Apr 27, 2021 at 10:20 AM Dian Fu wrote: > For the command line arguments, it’s documented in > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html > > 2021年4月27日

Re: Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-27 Thread cxydevelop
oh, I am wrong again, the last it is in flink_1.12.2 not flink_1.11.2 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi Dian, Thank you! That solves my question. By the way, for my use case, does -pyarch make more sense than -pyfs? Best, Yik San On Tue, Apr 27, 2021 at 4:52 PM Dian Fu wrote: > Hi Yik San, > > Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`? > > Regards, > Dian > >

Re: How to load resource in a PyFlink UDF

2021-04-27 Thread Dian Fu
Hi Yik San, Could you try `pd.read_csv(‘resources.zip/resources/crypt.csv’, xxx)`? Regards, Dian > 2021年4月27日 下午4:39,Yik San Chan 写道: > > Hi, > > My UDF has the dependency to a resource file named crypt.csv that is located > in resources/ directory. > > ```python > # udf_use_resource.py >

Re:Re: The wrong Options of Kafka Connector, will make the cluster can not run any job

2021-04-27 Thread chenxuying
I had tested flink job in flink_1.11.2 and flink_1.12.2. The error log I post before is in flink_1.11.2 cluster. Now I run job in flink_1.11.2. 1. The wrong Options of Kafka Connector Ip is right, port is wrong, ``` CREATE TABLE KafkaTable ( message STRING ) WITH (

How to load resource in a PyFlink UDF

2021-04-27 Thread Yik San Chan
Hi, My UDF has the dependency to a resource file named crypt.csv that is located in resources/ directory. ```python # udf_use_resource.py @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING()) def decrypt(s): import pandas as pd d = pd.read_csv('resources/crypt.csv', header=None,

recovery from savepoint appear java.lang.NullPointerException

2021-04-27 Thread 张美玲
2021-04-2716:19:34 java.lang.Exception: Exceptionwhile creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at

使用processor api生成savepoints,运行后报的错误

2021-04-27 Thread 张美玲
2021-04-2715:59:18 java.lang.Exception: Exceptionwhile creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at

Re: Contradictory docs: python.files config can include not only python files

2021-04-27 Thread Yik San Chan
Hi Dian, I created a PR to fix the docs. https://github.com/apache/flink/pull/15779 On Tue, Apr 27, 2021 at 2:08 PM Dian Fu wrote: > Thanks for the suggestion. It makes sense to me~. > > 2021年4月27日 上午10:28,Yik San Chan 写道: > > Hi Dian, > > If that's the case, shall we reword "Attach custom

Log rollover for logs.

2021-04-27 Thread John Smith
Hi, I'm running flink as a systemd service with... [Service] Type=forking WorkingDirectory=/opt/flink User=flink Group=flink ExecStart=/opt/flink/bin/taskmanager.sh start ExecStop=/opt/flink/bin/taskmanager.sh stop TimeoutSec=30 Restart=on-failure My log4j.porperties file is at /opt/flink/conf

??????flink cdc ????mysql binlog?????? streaming????????????????????reload????????????????????

2021-04-27 Thread ??????????
---- ??: "user-zh"

??????flink cdc ????mysql binlog?????? streaming????????????????????reload????????????????????

2021-04-27 Thread MOBIN
datastreaming API??debeziumdebezium?? public static Properties debeziumProperties(){ Properties properties = new Properties(); properties.setProperty(????,??"); return properties; } SourceFunction sourceFunction =

Re: Watermarks in Event Time Temporal Join

2021-04-27 Thread Leonard Xu
Hello, Maciej > I agree the watermark should pass on versioned table side, because > this is the only way to know which version of record should be used. > But if we mimics behaviour of interval join then main stream watermark > could be skipped. IIRC, rowtime interval join requires the watermark

Re: Deployment/Memory Configuration/Scalability

2021-04-27 Thread Radoslav Smilyanov
Hi Yangze Guo, Thanks for your reply. > 1. Is it a good idea to have regular savepoints (say on a daily basis)? > > 2. Is it possible to have high availability with Per-Job mode? Or maybe > I should go with session mode and make sure that my flink cluster is > running a single job? > Yes, we can

flink cdc ????mysql binlog?????? streaming????????????????????reload????????????????????

2021-04-27 Thread ??????????
hi all ??flink cdc??streaming mode??binlog??mysql??RELOADsqlcdc debezium.snap.shot.locking.mode = none streaming

Re: Contradictory docs: python.files config can include not only python files

2021-04-27 Thread Dian Fu
Thanks for the suggestion. It makes sense to me~. > 2021年4月27日 上午10:28,Yik San Chan 写道: > > Hi Dian, > > If that's the case, shall we reword "Attach custom python files for job." > into "attach custom files that could be put in PYTHONPATH, e.g., .zip, .whl, > etc." > > Best, > Yik San > >