[jira] [Commented] (FLINK-32086) Cleanup non-reported managed directory on exit of TM

2024-04-22 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839917#comment-17839917
 ] 

xiaogang zhou commented on FLINK-32086:
---

[~Zakelly] yes no problem

> Cleanup non-reported managed directory on exit of TM
> 
>
> Key: FLINK-32086
> URL: https://issues.apache.org/jira/browse/FLINK-32086
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Affects Versions: 1.18.0
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-07 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834636#comment-17834636
 ] 

xiaogang zhou commented on FLINK-32070:
---

[~Zakelly] yes, sounds good, Let me take a look 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-07 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834609#comment-17834609
 ] 

xiaogang zhou edited comment on FLINK-32070 at 4/7/24 6:36 AM:
---

Is there any Branch I can compile to do a POC? And I think if you are busy on 
flink 2.0 state, I can also help do some work on this FLIP-306?[~Zakelly] 


was (Author: zhoujira86):
Is there any Branch I can view do a POC? And I think if you are busy on flink 
2.0 state, I can also help do some work on this issue?[~Zakelly] 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-07 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834609#comment-17834609
 ] 

xiaogang zhou commented on FLINK-32070:
---

Is there any Branch I can view do a POC? And I think if you are busy on flink 
2.0 state, I can also help do some work on this issue?[~Zakelly] 

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32070) FLIP-306 Unified File Merging Mechanism for Checkpoints

2024-04-06 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17834580#comment-17834580
 ] 

xiaogang zhou commented on FLINK-32070:
---

[~Zakelly] Hi, we met a problem that FLINK checkpoint has too many sst files 
will cause great IOPS on HDFS. Can this issue help on that scenario?

> FLIP-306 Unified File Merging Mechanism for Checkpoints
> ---
>
> Key: FLINK-32070
> URL: https://issues.apache.org/jira/browse/FLINK-32070
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Checkpointing, Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Fix For: 1.20.0
>
>
> The FLIP: 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]
>  
> The creation of multiple checkpoint files can lead to a 'file flood' problem, 
> in which a large number of files are written to the checkpoint storage in a 
> short amount of time. This can cause issues in large clusters with high 
> workloads, such as the creation and deletion of many files increasing the 
> amount of file meta modification on DFS, leading to single-machine hotspot 
> issues for meta maintainers (e.g. NameNode in HDFS). Additionally, the 
> performance of object storage (e.g. Amazon S3 and Alibaba OSS) can 
> significantly decrease when listing objects, which is necessary for object 
> name de-duplication before creating an object, further affecting the 
> performance of directory manipulation in the file system's perspective of 
> view (See [hadoop-aws module 
> documentation|https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#:~:text=an%20intermediate%20state.-,Warning%20%232%3A%20Directories%20are%20mimicked,-The%20S3A%20clients],
>  section 'Warning #2: Directories are mimicked').
> While many solutions have been proposed for individual types of state files 
> (e.g. FLINK-11937 for keyed state (RocksDB) and FLINK-26803 for channel 
> state), the file flood problems from each type of checkpoint file are similar 
> and lack systematic view and solution. Therefore, the goal of this FLIP is to 
> establish a unified file merging mechanism to address the file flood problem 
> during checkpoint creation for all types of state files, including keyed, 
> non-keyed, channel, and changelog state. This will significantly improve the 
> system stability and availability of fault tolerance in Flink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-04-01 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832762#comment-17832762
 ] 

xiaogang zhou commented on FLINK-34976:
---

[~yunta] Ok, Thanks for clarification

> LD_PRELOAD environment may not be effective after su to flink user
> --
>
> Key: FLINK-34976
> URL: https://issues.apache.org/jira/browse/FLINK-34976
> Project: Flink
>  Issue Type: New Feature
>  Components: flink-docker
>Affects Versions: 1.19.0
>Reporter: xiaogang zhou
>Priority: Major
>
> I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Should 
> we  create a .bashrc file in home directory of flink, and export LD_PRELOAD 
> for flink user?
>  
> [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-04-01 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou closed FLINK-34976.
-
Resolution: Invalid

> LD_PRELOAD environment may not be effective after su to flink user
> --
>
> Key: FLINK-34976
> URL: https://issues.apache.org/jira/browse/FLINK-34976
> Project: Flink
>  Issue Type: New Feature
>  Components: flink-docker
>Affects Versions: 1.19.0
>Reporter: xiaogang zhou
>Priority: Major
>
> I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Should 
> we  create a .bashrc file in home directory of flink, and export LD_PRELOAD 
> for flink user?
>  
> [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-03-31 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-34976:
--
Description: 
I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
need to create a .bashrc file in home directory of flink, and export LD_PRELOAD 
for flink user?

 

[https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
 

  was:
I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
need to create a .bashrc file in home directory of flink

 

[https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
 


> LD_PRELOAD environment may not be effective after su to flink user
> --
>
> Key: FLINK-34976
> URL: https://issues.apache.org/jira/browse/FLINK-34976
> Project: Flink
>  Issue Type: New Feature
>  Components: flink-docker
>Affects Versions: 1.19.0
>Reporter: xiaogang zhou
>Priority: Major
>
> I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
> need to create a .bashrc file in home directory of flink, and export 
> LD_PRELOAD for flink user?
>  
> [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-03-31 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-34976:
--
Description: 
I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Should we 
 create a .bashrc file in home directory of flink, and export LD_PRELOAD for 
flink user?

 

[https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
 

  was:
I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
need to create a .bashrc file in home directory of flink, and export LD_PRELOAD 
for flink user?

 

[https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
 


> LD_PRELOAD environment may not be effective after su to flink user
> --
>
> Key: FLINK-34976
> URL: https://issues.apache.org/jira/browse/FLINK-34976
> Project: Flink
>  Issue Type: New Feature
>  Components: flink-docker
>Affects Versions: 1.19.0
>Reporter: xiaogang zhou
>Priority: Major
>
> I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Should 
> we  create a .bashrc file in home directory of flink, and export LD_PRELOAD 
> for flink user?
>  
> [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-03-31 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-34976:
--
Description: 
I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
need to create a .bashrc file in home directory of flink

 

[https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
 

> LD_PRELOAD environment may not be effective after su to flink user
> --
>
> Key: FLINK-34976
> URL: https://issues.apache.org/jira/browse/FLINK-34976
> Project: Flink
>  Issue Type: New Feature
>  Components: flink-docker
>Affects Versions: 1.19.0
>Reporter: xiaogang zhou
>Priority: Major
>
> I am not sure if LD_PRELOAD  still takes effect after drop_privs_cmd. Do we 
> need to create a .bashrc file in home directory of flink
>  
> [https://github.com/apache/flink-docker/blob/627987997ca7ec86bcc3d80b26df58aa595b91af/1.17/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L92]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34976) LD_PRELOAD environment may not be effective after su to flink user

2024-03-31 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-34976:
-

 Summary: LD_PRELOAD environment may not be effective after su to 
flink user
 Key: FLINK-34976
 URL: https://issues.apache.org/jira/browse/FLINK-34976
 Project: Flink
  Issue Type: New Feature
  Components: flink-docker
Affects Versions: 1.19.0
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-17593) Support arbitrary recovery mechanism for PartFileWriter

2024-02-04 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814195#comment-17814195
 ] 

xiaogang zhou commented on FLINK-17593:
---

[~maguowei] Hi , for filesystem which does not support recoverable writer(like 
oss filesystem), looks like we can not use the filesystem connector?

> Support arbitrary recovery mechanism for PartFileWriter
> ---
>
> Key: FLINK-17593
> URL: https://issues.apache.org/jira/browse/FLINK-17593
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / FileSystem
>Reporter: Yun Gao
>Assignee: Guowei Ma
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Currently Bucket relies directly on _RecoverableOutputStream_ provided by 
> FileSystem to achieve snapshotting and recovery the in-progress part file for 
> all the PartFileWriter implementations. This would require that the 
> PartFileWriter must be based on the OutputStream.
> To support the path-based PartFileWriter required by the Hive Sink, we will 
> first need to abstract the snapshotting mechanism of the PartFileWriter and 
> make RecoverableOutputStream to be one type of implementation, thus we could 
> decouple PartFileWriter with the output streams. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33728) Do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-22 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33728:
--
Summary: Do not rewatch when KubernetesResourceManagerDriver watch fail  
(was: do not rewatch when KubernetesResourceManagerDriver watch fail)

> Do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-14 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17806640#comment-17806640
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~xtsong] [~wangyang0918] Ok, glad to hear that. Would you please help assign 
the ticket to me?

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Expose Rocksdb Histogram statistics in Flink metrics

2024-01-09 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Summary: Expose Rocksdb Histogram statistics in Flink metrics   (was: 
Exposed Rocksdb Histogram statistics in Flink metrics )

> Expose Rocksdb Histogram statistics in Flink metrics 
> -
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>
> I'd like to expose ROCKSDB Histogram metrics like db_get db_write to enable 
> trouble shooting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Exposed Rocksdb Histogram statistics in Flink metrics

2024-01-09 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Description: I'd like to expose ROCKSDB Histogram metrics like db_get 
db_write to enable trouble shooting  (was: I think we can also parse the 
multi-line string of the rocksdb statistics.
{code:java}
// code placeholder
/**
 * DB implements can export properties about their state
 * via this method on a per column family level.
 *
 * If {@code property} is a valid property understood by this DB
 * implementation, fills {@code value} with its current value and
 * returns true. Otherwise returns false.
 *
 * Valid property names include:
 * 
 * "rocksdb.num-files-at-levelN" - return the number of files at
 * level N, where N is an ASCII representation of a level
 * number (e.g. "0").
 * "rocksdb.stats" - returns a multi-line string that describes statistics
 * about the internal operation of the DB.
 * "rocksdb.sstables" - returns a multi-line string that describes all
 *of the sstables that make up the db contents.
 * 
 *
 * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
 * instance, or null for the default column family.
 * @param property to be fetched. See above for examples
 * @return property value
 *
 * @throws RocksDBException thrown if error happens in underlying
 *native library.
 */
public String getProperty(
/* @Nullable */ final ColumnFamilyHandle columnFamilyHandle,
final String property) throws RocksDBException { {code}
 

Then we can directly export these rt latency number in metrics.

 

I'd like to introduce 2 rocksdb statistic related configuration.

Then we can customize stats
{code:java}
// code placeholder
Statistics s = new Statistics();
s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
.setStatistics(s); {code})

> Exposed Rocksdb Histogram statistics in Flink metrics 
> --
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>
> I'd like to expose ROCKSDB Histogram metrics like db_get db_write to enable 
> trouble shooting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Exposed Rocksdb Histogram statistics in Flink metrics

2024-01-09 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Summary: Exposed Rocksdb Histogram statistics in Flink metrics   (was: 
Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic 
related configuration)

> Exposed Rocksdb Histogram statistics in Flink metrics 
> --
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>
> I think we can also parse the multi-line string of the rocksdb statistics.
> {code:java}
> // code placeholder
> /**
>  * DB implements can export properties about their state
>  * via this method on a per column family level.
>  *
>  * If {@code property} is a valid property understood by this DB
>  * implementation, fills {@code value} with its current value and
>  * returns true. Otherwise returns false.
>  *
>  * Valid property names include:
>  * 
>  * "rocksdb.num-files-at-levelN" - return the number of files at
>  * level N, where N is an ASCII representation of a level
>  * number (e.g. "0").
>  * "rocksdb.stats" - returns a multi-line string that describes statistics
>  * about the internal operation of the DB.
>  * "rocksdb.sstables" - returns a multi-line string that describes all
>  *of the sstables that make up the db contents.
>  * 
>  *
>  * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
>  * instance, or null for the default column family.
>  * @param property to be fetched. See above for examples
>  * @return property value
>  *
>  * @throws RocksDBException thrown if error happens in underlying
>  *native library.
>  */
> public String getProperty(
> /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle,
> final String property) throws RocksDBException { {code}
>  
> Then we can directly export these rt latency number in metrics.
>  
> I'd like to introduce 2 rocksdb statistic related configuration.
> Then we can customize stats
> {code:java}
> // code placeholder
> Statistics s = new Statistics();
> s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
> currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
> .setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804638#comment-17804638
 ] 

xiaogang zhou edited comment on FLINK-33728 at 1/9/24 8:53 AM:
---

[~xtsong] In a default FLINK setting, when the KubenetesClient  disconnects 
from KUBE API server, it will try to reconnect for infinitely times. As 
kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat 
ResourceVersionTooOld as a special exception, as it will escape from the normal 
reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for 
kubernetes.transactional-operation.max-retries times, and these retries have 
not interval between them. If the watcher does not recover, the JM will kill it 
self.

 

So I think the problem we are trying to solve is not only to avoid massive 
Flink jobs trying to re-creating watches at the same time.  But also how to 
allow FLINK to continue running even when the KUBE API SERVER is in a disorder 
situation. As for most of the times, FLINK TMs have no dependency on API SERVER.

 

If you think it is not acceptable to recover the watcher only requesting 
resource, I think another possible way is , we can retry to rewatch pods 
periodically.

 

WDYT? :) 


was (Author: zhoujira86):
[~xtsong] In a default FLINK setting, when the KubenetesClient  disconnects 
from KUBE API server, it will try to reconnect for infinitely times. As 
kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat 
ResourceVersionTooOld as a special exception, as it will escape from the normal 
reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for 
kubernetes.transactional-operation.max-retries times, and these retries have 
not interval between them. If the watcher does not recover, the JM will kill it 
self.

 

So I think the problem we are trying to solve is not only to avoid massive 
Flink jobs trying to re-creating watches at the same time.  But also how to 
allow FLINK to continue running even when the KUBE API is in a disorder 
situation. As for most of the times, FLINK TMs do not need to be bothered by a 
bad API server .

 

If you think it is not acceptable to recover the watcher only requesting 
resource, I think another possible way is , we can retry to rewatch pods 
periodically.

 

WDYT? :) 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804638#comment-17804638
 ] 

xiaogang zhou edited comment on FLINK-33728 at 1/9/24 8:52 AM:
---

[~xtsong] In a default FLINK setting, when the KubenetesClient  disconnects 
from KUBE API server, it will try to reconnect for infinitely times. As 
kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat 
ResourceVersionTooOld as a special exception, as it will escape from the normal 
reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for 
kubernetes.transactional-operation.max-retries times, and these retries have 
not interval between them. If the watcher does not recover, the JM will kill it 
self.

 

So I think the problem we are trying to solve is not only to avoid massive 
Flink jobs trying to re-creating watches at the same time.  But also how to 
allow FLINK to continue running even when the KUBE API is in a disorder 
situation. As for most of the times, FLINK TMs do not need to be bothered by a 
bad API server .

 

If you think it is not acceptable to recover the watcher only requesting 
resource, I think another possible way is , we can retry to rewatch pods 
periodically.

 

WDYT? :) 


was (Author: zhoujira86):
[~xtsong] In a default FLINK setting, when the KubenetesClient  disconnects 
from KUBE API server, it will try to reconnect for infinitely times. As 
kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat 
ResourceVersionTooOld as a special exception, as it will escape from the normal 
reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for 
kubernetes.transactional-operation.max-retries times. If the watcher does not 
recover, the JM will kill it self.

 

So I think the problem we are trying to solve is not only to avoid massive 
Flink jobs trying to re-creating watches at the same time.  But also how to 
allow FLINK to continue running even when the KUBE API is in a disorder 
situation. As for most of the times, FLINK TMs do not need to be bothered by a 
bad API server .

 

If you think it is not acceptable to recover the watcher only requesting 
resource, I think another possible way is , we can retry to rewatch pods 
periodically.

 

WDYT? :) 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804638#comment-17804638
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~xtsong] In a default FLINK setting, when the KubenetesClient  disconnects 
from KUBE API server, it will try to reconnect for infinitely times. As 
kubernetes.watch.reconnectLimit is -1. But KubenetesClient treat 
ResourceVersionTooOld as a special exception, as it will escape from the normal 
reconnects. And then it will cause FLINK FlinkKubeClient to retry connect for 
kubernetes.transactional-operation.max-retries times. If the watcher does not 
recover, the JM will kill it self.

 

So I think the problem we are trying to solve is not only to avoid massive 
Flink jobs trying to re-creating watches at the same time.  But also how to 
allow FLINK to continue running even when the KUBE API is in a disorder 
situation. As for most of the times, FLINK TMs do not need to be bothered by a 
bad API server .

 

If you think it is not acceptable to recover the watcher only requesting 
resource, I think another possible way is , we can retry to rewatch pods 
periodically.

 

WDYT? :) 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-08 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804553#comment-17804553
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~mapohl] I think your concern is really very important. I think my statement 
is not good enough. After your reminder, I'd like to change it to :

 

We can just neglect the disconnection of watching process {color:#FF}if 
there is no pending request{color}. and try to rewatch once new requestResource 
called.

 

And we can choose to fail all CompletableFuture And the 
[requestWorkerIfRequired|https://github.com/apache/flink/blob/2b9b9859253698c3c90ca420f10975e27e6c52d4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java#L332]
 will request the resource again, this will trigger the rewatch.

 

WDYT [~mapohl] [~xtsong] 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2024-01-08 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17804214#comment-17804214
 ] 

xiaogang zhou commented on FLINK-33728:
---

Hi Matthias, wish you had recovered and enjoyed a wonderful Holiday :).  Can we 
have a discussion on my proposal [~mapohl] 

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-19 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17798797#comment-17798797
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~mapohl] Hi Matthias , would you please let me know what additional test is 
needed to prove my proposal can move forward.

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-11 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795571#comment-17795571
 ] 

xiaogang zhou commented on FLINK-33728:
---

Hi [~mapohl] , thanks for the comment above. sorry for my poor writing english 
:P, but I think your re-clarification  is exactly what I am proposing. I'd like 
to introduce a lazy re-initialization of watch mechanism which will tolerate a 
disconnection of the watch until a new POD is requested.

And I think your concern is how we detect a TM loss without a active watcher.  
I have test my change in a real K8S environment. With a disconnected watcher, I 
killed a TM pod. after no more than 50s, the task restarted with a exception
{code:java}
// code placeholder
 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
flink-6168d34cf9d3a5d31ad8bb02bce6a370-taskmanager-1-8 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1306)
at 
org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitC {code}
moreover, I think YARN also do not have a watcher mechanism, so FLINK scheduled 
in yarn also relays on a heartbeat timeout mechanism? 

 

And an active rewatching strategy can really cause great pressure on API 
server, especially in the early versions without the resource version zero set 
in the watch-list request.

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration

2023-12-10 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Summary: Exposed Rocksdb statistics in Flink metrics and introduce 2 
Rocksdb statistic related configuration  (was: Introduce stat dump period and 
statsLevel configuration)

> Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic 
> related configuration
> ---
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce 2 rocksdb statistic related configuration.
> Then we can customize stats
> {code:java}
> // code placeholder
> Statistics s = new Statistics();
> s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
> currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
> .setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic related configuration

2023-12-10 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Description: 
I think we can also parse the multi-line string of the rocksdb statistics.
{code:java}
// code placeholder
/**
 * DB implements can export properties about their state
 * via this method on a per column family level.
 *
 * If {@code property} is a valid property understood by this DB
 * implementation, fills {@code value} with its current value and
 * returns true. Otherwise returns false.
 *
 * Valid property names include:
 * 
 * "rocksdb.num-files-at-levelN" - return the number of files at
 * level N, where N is an ASCII representation of a level
 * number (e.g. "0").
 * "rocksdb.stats" - returns a multi-line string that describes statistics
 * about the internal operation of the DB.
 * "rocksdb.sstables" - returns a multi-line string that describes all
 *of the sstables that make up the db contents.
 * 
 *
 * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
 * instance, or null for the default column family.
 * @param property to be fetched. See above for examples
 * @return property value
 *
 * @throws RocksDBException thrown if error happens in underlying
 *native library.
 */
public String getProperty(
/* @Nullable */ final ColumnFamilyHandle columnFamilyHandle,
final String property) throws RocksDBException { {code}
 

Then we can directly export these rt latency number in metrics.

 

I'd like to introduce 2 rocksdb statistic related configuration.

Then we can customize stats
{code:java}
// code placeholder
Statistics s = new Statistics();
s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
.setStatistics(s); {code}

  was:
I'd like to introduce 2 rocksdb statistic related configuration.

Then we can customize stats
{code:java}
// code placeholder
Statistics s = new Statistics();
s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
.setStatistics(s); {code}


> Exposed Rocksdb statistics in Flink metrics and introduce 2 Rocksdb statistic 
> related configuration
> ---
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Priority: Major
>
> I think we can also parse the multi-line string of the rocksdb statistics.
> {code:java}
> // code placeholder
> /**
>  * DB implements can export properties about their state
>  * via this method on a per column family level.
>  *
>  * If {@code property} is a valid property understood by this DB
>  * implementation, fills {@code value} with its current value and
>  * returns true. Otherwise returns false.
>  *
>  * Valid property names include:
>  * 
>  * "rocksdb.num-files-at-levelN" - return the number of files at
>  * level N, where N is an ASCII representation of a level
>  * number (e.g. "0").
>  * "rocksdb.stats" - returns a multi-line string that describes statistics
>  * about the internal operation of the DB.
>  * "rocksdb.sstables" - returns a multi-line string that describes all
>  *of the sstables that make up the db contents.
>  * 
>  *
>  * @param columnFamilyHandle {@link org.rocksdb.ColumnFamilyHandle}
>  * instance, or null for the default column family.
>  * @param property to be fetched. See above for examples
>  * @return property value
>  *
>  * @throws RocksDBException thrown if error happens in underlying
>  *native library.
>  */
> public String getProperty(
> /* @Nullable */ final ColumnFamilyHandle columnFamilyHandle,
> final String property) throws RocksDBException { {code}
>  
> Then we can directly export these rt latency number in metrics.
>  
> I'd like to introduce 2 rocksdb statistic related configuration.
> Then we can customize stats
> {code:java}
> // code placeholder
> Statistics s = new Statistics();
> s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
> currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
> .setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33741) Introduce stat dump period and statsLevel configuration

2023-12-10 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33741:
--
Summary: Introduce stat dump period and statsLevel configuration  (was: 
introduce stat dump period and statsLevel configuration)

> Introduce stat dump period and statsLevel configuration
> ---
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce 2 rocksdb statistic related configuration.
> Then we can customize stats
> {code:java}
> // code placeholder
> Statistics s = new Statistics();
> s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
> currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
> .setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33741) introduce stat dump period and statsLevel configuration

2023-12-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794941#comment-17794941
 ] 

xiaogang zhou commented on FLINK-33741:
---

And I think we can also parse the multi-line string of the rocksdb statistics. 
Then we can directly export these rt latency number in metrics.

 

> introduce stat dump period and statsLevel configuration
> ---
>
> Key: FLINK-33741
> URL: https://issues.apache.org/jira/browse/FLINK-33741
> Project: Flink
>  Issue Type: New Feature
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce 2 rocksdb statistic related configuration.
> Then we can customize stats
> {code:java}
> // code placeholder
> Statistics s = new Statistics();
> s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
> currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
> .setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794900#comment-17794900
 ] 

xiaogang zhou edited comment on FLINK-33728 at 12/9/23 9:52 AM:


[~gyfora] my proposal is keep the jobmanager running after watch fail, and do 
not rewatch before next request resource called. 

A healthy watch listener can get notification from kubernetes of two kind:

add pod and delete pod.

1. add pod is necessary when request resource, when we are not requesting 
resource, this notification is allowed to be lost.

2. delete pod can allow us detect pod failure more quickly, but we can also 
discover it by detecting the lost of akka heartbeat timeout.

 

according to the statement above, we can tolerate the lost of watch connection 
when we are not requesting resource


was (Author: zhoujira86):
[~gyfora] my proposal is keep the jobmanager running even rewatch fail. 

A healthy watch listener can get notification from kubernetes of two kind:

add pod and delete pod.

1. add pod is necessary when request resource, when we are not requesting 
resource, this notification is allowed to be lost.

2. delete pod can allow us detect pod failure more quickly, but we can also 
discover it by detecting the lost of akka heartbeat timeout.

 

according to the statement above, we can tolerate the lost of watch connection 
when we are not requesting resource

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-08 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17794900#comment-17794900
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~gyfora] my proposal is keep the jobmanager running even rewatch fail. 

A healthy watch listener can get notification from kubernetes of two kind:

add pod and delete pod.

1. add pod is necessary when request resource, when we are not requesting 
resource, this notification is allowed to be lost.

2. delete pod can allow us detect pod failure more quickly, but we can also 
discover it by detecting the lost of akka heartbeat timeout.

 

according to the statement above, we can tolerate the lost of watch connection 
when we are not requesting resource

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33741) introduce stat dump period and statsLevel configuration

2023-12-04 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33741:
-

 Summary: introduce stat dump period and statsLevel configuration
 Key: FLINK-33741
 URL: https://issues.apache.org/jira/browse/FLINK-33741
 Project: Flink
  Issue Type: New Feature
Reporter: xiaogang zhou


I'd like to introduce 2 rocksdb statistic related configuration.

Then we can customize stats
{code:java}
// code placeholder
Statistics s = new Statistics();
s.setStatsLevel(EXCEPT_TIME_FOR_MUTEX);
currentOptions.setStatsDumpPeriodSec(internalGetOption(RocksDBConfigurableOptions.STATISTIC_DUMP_PERIOD))
.setStatistics(s); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-04 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793092#comment-17793092
 ] 

xiaogang zhou commented on FLINK-33728:
---

[~wangyang0918] [~mapohl] [~gyfora] Would you please let me know you thinking?

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-04 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33728:
--
Description: 
I met massive production problem when kubernetes ETCD slow responding happen. 
After Kube recoverd after 1 hour, Thousands of Flink jobs using 
kubernetesResourceManagerDriver rewatched when recieving ResourceVersionTooOld, 
 which caused great pressure on API Server and made API server failed again... 

 

I am not sure is it necessary to

getResourceEventHandler().onError(throwable)

in  PodCallbackHandlerImpl# handleError method?

 

We can just neglect the disconnection of watching process. and try to rewatch 
once new requestResource called. And we can leverage on the akka heartbeat 
timeout to discover the TM failure, just like YARN mode do.

  was:
is it necessary to

getResourceEventHandler().onError(throwable)

in  PodCallbackHandlerImpl# handleError method.

 

We can just neglect the disconnection of watching process. and try to rewatch 
once new requestResource called


> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>
> I met massive production problem when kubernetes ETCD slow responding happen. 
> After Kube recoverd after 1 hour, Thousands of Flink jobs using 
> kubernetesResourceManagerDriver rewatched when recieving 
> ResourceVersionTooOld,  which caused great pressure on API Server and made 
> API server failed again... 
>  
> I am not sure is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method?
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called. And we can leverage on the akka heartbeat 
> timeout to discover the TM failure, just like YARN mode do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-03 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33728:
--
Description: 
is it necessary to

getResourceEventHandler().onError(throwable)

in  PodCallbackHandlerImpl# handleError method.

 

We can just neglect the disconnection of watching process. and try to rewatch 
once new requestResource called

> do not rewatch when KubernetesResourceManagerDriver watch fail
> --
>
> Key: FLINK-33728
> URL: https://issues.apache.org/jira/browse/FLINK-33728
> Project: Flink
>  Issue Type: New Feature
>  Components: Deployment / Kubernetes
>Reporter: xiaogang zhou
>Priority: Major
>
> is it necessary to
> getResourceEventHandler().onError(throwable)
> in  PodCallbackHandlerImpl# handleError method.
>  
> We can just neglect the disconnection of watching process. and try to rewatch 
> once new requestResource called



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33728) do not rewatch when KubernetesResourceManagerDriver watch fail

2023-12-03 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33728:
-

 Summary: do not rewatch when KubernetesResourceManagerDriver watch 
fail
 Key: FLINK-33728
 URL: https://issues.apache.org/jira/browse/FLINK-33728
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / Kubernetes
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-18 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1035#comment-1035
 ] 

xiaogang zhou commented on FLINK-33249:
---

Hi [~martijnvisser]

https://issues.apache.org/jira/browse/CALCITE-6001 will improve CALCITE to omit 
the charset from the generated literal when it is the default charset of the 
DIALECT. maybe wait for the CALCITE future version and set the FLINK DIALECT to 
use UTF-8

> comment should be parsed by StringLiteral() instead of SqlCharStringLiteral 
> to avoid parsing failure
> 
>
> Key: FLINK-33249
> URL: https://issues.apache.org/jira/browse/FLINK-33249
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> this problem is also recorded in calcite
>  
> https://issues.apache.org/jira/browse/CALCITE-6046
>  
> Hi, I found this problem when I used below code to split SQL statements. the 
> process is SQL string -> SqlNode -> SQL String
> {code:java}
> // code placeholder
> SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
> SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
> SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 
> sqlParser.parse(sqlNodeList.get(0));{code}
> the Dialect/ SqlConformance is a costumed one:
> [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]
>  
>  
> then I found below SQL
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     a BIGINT
> ) comment '测试test'
> WITH (
>   'connector' = 'test'
> );  {code}
> transformed to
> {code:java}
> // code placeholder
> CREATE TABLE `source` (
>   `a` BIGINT
> )
> COMMENT u&'\5218\51eftest' WITH (
>   'connector' = 'test'
> )  {code}
>  
> and the SQL parser template is like
> {code:java}
> // code placeholder
> SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
> {
> final SqlParserPos startPos = s.pos();
> boolean ifNotExists = false;
> SqlIdentifier tableName;
> List constraints = new 
> ArrayList();
> SqlWatermark watermark = null;
> SqlNodeList columnList = SqlNodeList.EMPTY;
>SqlCharStringLiteral comment = null;
>SqlTableLike tableLike = null;
> SqlNode asQuery = null;
> SqlNodeList propertyList = SqlNodeList.EMPTY;
> SqlNodeList partitionColumns = SqlNodeList.EMPTY;
> SqlParserPos pos = startPos;
> }
> {
> 
> ifNotExists = IfNotExistsOpt()
> tableName = CompoundIdentifier()
> [
>  { pos = getPos(); TableCreationContext ctx = new 
> TableCreationContext();}
> TableColumn(ctx)
> (
>  TableColumn(ctx)
> )*
> {
> pos = pos.plus(getPos());
> columnList = new SqlNodeList(ctx.columnList, pos);
> constraints = ctx.constraints;
> watermark = ctx.watermark;
> }
> 
> ]
> [   {
> String p = SqlParserUtil.parseString(token.image);
> comment = SqlLiteral.createCharString(p, getPos());
> }]
> [
>  
> partitionColumns = ParenthesizedSimpleIdentifierList()
> ]
> [
> 
> propertyList = TableProperties()
> ]
> [
> 
> tableLike = SqlTableLike(getPos())
> {
> return new SqlCreateTableLike(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> tableLike,
> isTemporary,
> ifNotExists);
> }
> |
> 
> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
> {
> return new SqlCreateTableAs(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> asQuery,
> isTemporary,
> ifNotExists);
> }
> ]
> {
> return new SqlCreateTable(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> isTemporary,
> ifNotExists);
> }
> } {code}
> will give a exception :
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
> 

[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-12 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774468#comment-17774468
 ] 

xiaogang zhou commented on FLINK-33249:
---

[~martijnvisser] Sure thing, Let me provide a test to demonstrate the issue.

> comment should be parsed by StringLiteral() instead of SqlCharStringLiteral 
> to avoid parsing failure
> 
>
> Key: FLINK-33249
> URL: https://issues.apache.org/jira/browse/FLINK-33249
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> this problem is also recorded in calcite
>  
> https://issues.apache.org/jira/browse/CALCITE-6046
>  
> Hi, I found this problem when I used below code to split SQL statements. the 
> process is SQL string -> SqlNode -> SQL String
> {code:java}
> // code placeholder
> SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
> SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
> SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 
> sqlParser.parse(sqlNodeList.get(0));{code}
> the Dialect/ SqlConformance is a costumed one:
> [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]
>  
>  
> then I found below SQL
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     a BIGINT
> ) comment '测试test'
> WITH (
>   'connector' = 'test'
> );  {code}
> transformed to
> {code:java}
> // code placeholder
> CREATE TABLE `source` (
>   `a` BIGINT
> )
> COMMENT u&'\5218\51eftest' WITH (
>   'connector' = 'test'
> )  {code}
>  
> and the SQL parser template is like
> {code:java}
> // code placeholder
> SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
> {
> final SqlParserPos startPos = s.pos();
> boolean ifNotExists = false;
> SqlIdentifier tableName;
> List constraints = new 
> ArrayList();
> SqlWatermark watermark = null;
> SqlNodeList columnList = SqlNodeList.EMPTY;
>SqlCharStringLiteral comment = null;
>SqlTableLike tableLike = null;
> SqlNode asQuery = null;
> SqlNodeList propertyList = SqlNodeList.EMPTY;
> SqlNodeList partitionColumns = SqlNodeList.EMPTY;
> SqlParserPos pos = startPos;
> }
> {
> 
> ifNotExists = IfNotExistsOpt()
> tableName = CompoundIdentifier()
> [
>  { pos = getPos(); TableCreationContext ctx = new 
> TableCreationContext();}
> TableColumn(ctx)
> (
>  TableColumn(ctx)
> )*
> {
> pos = pos.plus(getPos());
> columnList = new SqlNodeList(ctx.columnList, pos);
> constraints = ctx.constraints;
> watermark = ctx.watermark;
> }
> 
> ]
> [   {
> String p = SqlParserUtil.parseString(token.image);
> comment = SqlLiteral.createCharString(p, getPos());
> }]
> [
>  
> partitionColumns = ParenthesizedSimpleIdentifierList()
> ]
> [
> 
> propertyList = TableProperties()
> ]
> [
> 
> tableLike = SqlTableLike(getPos())
> {
> return new SqlCreateTableLike(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> tableLike,
> isTemporary,
> ifNotExists);
> }
> |
> 
> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
> {
> return new SqlCreateTableAs(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> asQuery,
> isTemporary,
> ifNotExists);
> }
> ]
> {
> return new SqlCreateTable(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> isTemporary,
> ifNotExists);
> }
> } {code}
> will give a exception :
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
> "u&\'\\5218\\51eftest\'" at line 4, column 9.
> Was expecting:
>      ...
>  
> so I think all the SqlCharStringLiteral should be replaced by StringLiteral()



--
This message was sent by Atlassian Jira

[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-12 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774426#comment-17774426
 ] 

xiaogang zhou commented on FLINK-33249:
---

[~martijnvisser]  Hi Martin, I am not sure whether this is something to be 
fixed in calcite. As 
SqlCreateTable
template is in flink parser. 

I have attached a url, would you please have a glance at it?

> comment should be parsed by StringLiteral() instead of SqlCharStringLiteral 
> to avoid parsing failure
> 
>
> Key: FLINK-33249
> URL: https://issues.apache.org/jira/browse/FLINK-33249
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> this problem is also recorded in calcite
>  
> https://issues.apache.org/jira/browse/CALCITE-6046
>  
> Hi, I found this problem when I used below code to split SQL statements. the 
> process is SQL string -> SqlNode -> SQL String
> {code:java}
> // code placeholder
> SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
> SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
> SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 
> sqlParser.parse(sqlNodeList.get(0));{code}
> the Dialect/ SqlConformance is a costumed one:
> [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]
>  
>  
> then I found below SQL
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     a BIGINT
> ) comment '测试test'
> WITH (
>   'connector' = 'test'
> );  {code}
> transformed to
> {code:java}
> // code placeholder
> CREATE TABLE `source` (
>   `a` BIGINT
> )
> COMMENT u&'\5218\51eftest' WITH (
>   'connector' = 'test'
> )  {code}
>  
> and the SQL parser template is like
> {code:java}
> // code placeholder
> SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
> {
> final SqlParserPos startPos = s.pos();
> boolean ifNotExists = false;
> SqlIdentifier tableName;
> List constraints = new 
> ArrayList();
> SqlWatermark watermark = null;
> SqlNodeList columnList = SqlNodeList.EMPTY;
>SqlCharStringLiteral comment = null;
>SqlTableLike tableLike = null;
> SqlNode asQuery = null;
> SqlNodeList propertyList = SqlNodeList.EMPTY;
> SqlNodeList partitionColumns = SqlNodeList.EMPTY;
> SqlParserPos pos = startPos;
> }
> {
> 
> ifNotExists = IfNotExistsOpt()
> tableName = CompoundIdentifier()
> [
>  { pos = getPos(); TableCreationContext ctx = new 
> TableCreationContext();}
> TableColumn(ctx)
> (
>  TableColumn(ctx)
> )*
> {
> pos = pos.plus(getPos());
> columnList = new SqlNodeList(ctx.columnList, pos);
> constraints = ctx.constraints;
> watermark = ctx.watermark;
> }
> 
> ]
> [   {
> String p = SqlParserUtil.parseString(token.image);
> comment = SqlLiteral.createCharString(p, getPos());
> }]
> [
>  
> partitionColumns = ParenthesizedSimpleIdentifierList()
> ]
> [
> 
> propertyList = TableProperties()
> ]
> [
> 
> tableLike = SqlTableLike(getPos())
> {
> return new SqlCreateTableLike(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> tableLike,
> isTemporary,
> ifNotExists);
> }
> |
> 
> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
> {
> return new SqlCreateTableAs(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> asQuery,
> isTemporary,
> ifNotExists);
> }
> ]
> {
> return new SqlCreateTable(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> isTemporary,
> ifNotExists);
> }
> } {code}
> will give a exception :
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
> "u&\'\\5218\\51eftest\'" at line 4, column 9.
> Was expecting:
>      ...
>  

[jira] [Commented] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-12 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17774339#comment-17774339
 ] 

xiaogang zhou commented on FLINK-33249:
---

I'd like to take this ticket

> comment should be parsed by StringLiteral() instead of SqlCharStringLiteral 
> to avoid parsing failure
> 
>
> Key: FLINK-33249
> URL: https://issues.apache.org/jira/browse/FLINK-33249
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>
> this problem is also recorded in calcite
>  
> https://issues.apache.org/jira/browse/CALCITE-6046
>  
> Hi, I found this problem when I used below code to split SQL statements. the 
> process is SQL string -> SqlNode -> SQL String
> {code:java}
> // code placeholder
> SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
> SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
> SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 
> sqlParser.parse(sqlNodeList.get(0));{code}
> the Dialect/ SqlConformance is a costumed one:
> [https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]
>  
>  
> then I found below SQL
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     a BIGINT
> ) comment '测试test'
> WITH (
>   'connector' = 'test'
> );  {code}
> transformed to
> {code:java}
> // code placeholder
> CREATE TABLE `source` (
>   `a` BIGINT
> )
> COMMENT u&'\5218\51eftest' WITH (
>   'connector' = 'test'
> )  {code}
>  
> and the SQL parser template is like
> {code:java}
> // code placeholder
> SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
> {
> final SqlParserPos startPos = s.pos();
> boolean ifNotExists = false;
> SqlIdentifier tableName;
> List constraints = new 
> ArrayList();
> SqlWatermark watermark = null;
> SqlNodeList columnList = SqlNodeList.EMPTY;
>SqlCharStringLiteral comment = null;
>SqlTableLike tableLike = null;
> SqlNode asQuery = null;
> SqlNodeList propertyList = SqlNodeList.EMPTY;
> SqlNodeList partitionColumns = SqlNodeList.EMPTY;
> SqlParserPos pos = startPos;
> }
> {
> 
> ifNotExists = IfNotExistsOpt()
> tableName = CompoundIdentifier()
> [
>  { pos = getPos(); TableCreationContext ctx = new 
> TableCreationContext();}
> TableColumn(ctx)
> (
>  TableColumn(ctx)
> )*
> {
> pos = pos.plus(getPos());
> columnList = new SqlNodeList(ctx.columnList, pos);
> constraints = ctx.constraints;
> watermark = ctx.watermark;
> }
> 
> ]
> [   {
> String p = SqlParserUtil.parseString(token.image);
> comment = SqlLiteral.createCharString(p, getPos());
> }]
> [
>  
> partitionColumns = ParenthesizedSimpleIdentifierList()
> ]
> [
> 
> propertyList = TableProperties()
> ]
> [
> 
> tableLike = SqlTableLike(getPos())
> {
> return new SqlCreateTableLike(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> tableLike,
> isTemporary,
> ifNotExists);
> }
> |
> 
> asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
> {
> return new SqlCreateTableAs(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> asQuery,
> isTemporary,
> ifNotExists);
> }
> ]
> {
> return new SqlCreateTable(startPos.plus(getPos()),
> tableName,
> columnList,
> constraints,
> propertyList,
> partitionColumns,
> watermark,
> comment,
> isTemporary,
> ifNotExists);
> }
> } {code}
> will give a exception :
> Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
> "u&\'\\5218\\51eftest\'" at line 4, column 9.
> Was expecting:
>      ...
>  
> so I think all the SqlCharStringLiteral should be replaced by StringLiteral()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-11 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33249:
--
Description: 
this problem is also recorded in calcite

 

https://issues.apache.org/jira/browse/CALCITE-6046

 

Hi, I found this problem when I used below code to split SQL statements. the 
process is SQL string -> SqlNode -> SQL String
{code:java}
// code placeholder
SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 

sqlParser.parse(sqlNodeList.get(0));{code}
the Dialect/ SqlConformance is a costumed one:

[https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]

 

 

then I found below SQL
{code:java}
// code placeholder
CREATE TABLE source (
    a BIGINT
) comment '测试test'
WITH (
  'connector' = 'test'
);  {code}
transformed to
{code:java}
// code placeholder
CREATE TABLE `source` (
  `a` BIGINT
)
COMMENT u&'\5218\51eftest' WITH (
  'connector' = 'test'
)  {code}
 

and the SQL parser template is like
{code:java}
// code placeholder
SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
boolean ifNotExists = false;
SqlIdentifier tableName;
List constraints = new ArrayList();
SqlWatermark watermark = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
   SqlCharStringLiteral comment = null;
   SqlTableLike tableLike = null;
SqlNode asQuery = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
}
{


ifNotExists = IfNotExistsOpt()

tableName = CompoundIdentifier()
[
 { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
TableColumn(ctx)
(
 TableColumn(ctx)
)*
{
pos = pos.plus(getPos());
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
}

]
[   {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
 
partitionColumns = ParenthesizedSimpleIdentifierList()
]
[

propertyList = TableProperties()
]
[

tableLike = SqlTableLike(getPos())
{
return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
tableLike,
isTemporary,
ifNotExists);
}
|

asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
return new SqlCreateTableAs(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
asQuery,
isTemporary,
ifNotExists);
}
]
{
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
isTemporary,
ifNotExists);
}
} {code}
will give a exception :

Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
"u&\'\\5218\\51eftest\'" at line 4, column 9.
Was expecting:
     ...

 

so I think all the SqlCharStringLiteral should be replaced by 

> comment should be parsed by StringLiteral() instead of SqlCharStringLiteral 
> to avoid parsing failure
> 
>
> Key: FLINK-33249
> URL: https://issues.apache.org/jira/browse/FLINK-33249
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>
> this problem is also recorded in calcite
>  
> https://issues.apache.org/jira/browse/CALCITE-6046
>  
> Hi, I found this problem when I used below code to split SQL statements. the 
> process is SQL string -> SqlNode -> SQL String
> {code:java}
> // code placeholder
> SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
> SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
> SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 
> 

[jira] [Updated] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-11 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33249:
--
Description: 
this problem is also recorded in calcite

 

https://issues.apache.org/jira/browse/CALCITE-6046

 

Hi, I found this problem when I used below code to split SQL statements. the 
process is SQL string -> SqlNode -> SQL String
{code:java}
// code placeholder
SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 

sqlParser.parse(sqlNodeList.get(0));{code}
the Dialect/ SqlConformance is a costumed one:

[https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]

 

 

then I found below SQL
{code:java}
// code placeholder
CREATE TABLE source (
    a BIGINT
) comment '测试test'
WITH (
  'connector' = 'test'
);  {code}
transformed to
{code:java}
// code placeholder
CREATE TABLE `source` (
  `a` BIGINT
)
COMMENT u&'\5218\51eftest' WITH (
  'connector' = 'test'
)  {code}
 

and the SQL parser template is like
{code:java}
// code placeholder
SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
boolean ifNotExists = false;
SqlIdentifier tableName;
List constraints = new ArrayList();
SqlWatermark watermark = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
   SqlCharStringLiteral comment = null;
   SqlTableLike tableLike = null;
SqlNode asQuery = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
}
{


ifNotExists = IfNotExistsOpt()

tableName = CompoundIdentifier()
[
 { pos = getPos(); TableCreationContext ctx = new 
TableCreationContext();}
TableColumn(ctx)
(
 TableColumn(ctx)
)*
{
pos = pos.plus(getPos());
columnList = new SqlNodeList(ctx.columnList, pos);
constraints = ctx.constraints;
watermark = ctx.watermark;
}

]
[   {
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
 
partitionColumns = ParenthesizedSimpleIdentifierList()
]
[

propertyList = TableProperties()
]
[

tableLike = SqlTableLike(getPos())
{
return new SqlCreateTableLike(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
tableLike,
isTemporary,
ifNotExists);
}
|

asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
return new SqlCreateTableAs(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
asQuery,
isTemporary,
ifNotExists);
}
]
{
return new SqlCreateTable(startPos.plus(getPos()),
tableName,
columnList,
constraints,
propertyList,
partitionColumns,
watermark,
comment,
isTemporary,
ifNotExists);
}
} {code}
will give a exception :

Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered 
"u&\'\\5218\\51eftest\'" at line 4, column 9.
Was expecting:
     ...

 

so I think all the SqlCharStringLiteral should be replaced by StringLiteral()

  was:
this problem is also recorded in calcite

 

https://issues.apache.org/jira/browse/CALCITE-6046

 

Hi, I found this problem when I used below code to split SQL statements. the 
process is SQL string -> SqlNode -> SQL String
{code:java}
// code placeholder
SqlParser.Config parserConfig = getCurrentSqlParserConfig(sqlDialect);
SqlParser sqlParser = SqlParser.create(sqlContent, parserConfig);
SqlNodeList sqlNodeList = sqlParser.parseStmtList(); 

sqlParser.parse(sqlNodeList.get(0));{code}
the Dialect/ SqlConformance is a costumed one:

[https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java]

 

 

then I found below SQL
{code:java}
// code placeholder
CREATE TABLE source (
    a BIGINT
) comment '测试test'
WITH (
  'connector' = 'test'
);  {code}
transformed to
{code:java}
// code placeholder
CREATE TABLE `source` (
  `a` BIGINT
)
COMMENT u&'\5218\51eftest' WITH (
  

[jira] [Created] (FLINK-33249) comment should be parsed by StringLiteral() instead of SqlCharStringLiteral to avoid parsing failure

2023-10-11 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33249:
-

 Summary: comment should be parsed by StringLiteral() instead of 
SqlCharStringLiteral to avoid parsing failure
 Key: FLINK-33249
 URL: https://issues.apache.org/jira/browse/FLINK-33249
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink

2023-10-09 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773306#comment-17773306
 ] 

xiaogang zhou commented on FLINK-33174:
---

[~libenchao] [~lsy] [~martijnvisser]  Thanks all for your comments, Let me 
prepare a FLIP first and wait for the calcite upgrading

> enabling tablesample bernoulli in flink
> ---
>
> Key: FLINK-33174
> URL: https://issues.apache.org/jira/browse/FLINK-33174
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce a table sample function to enable fast sampling to 
> streamings. 
> this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink

2023-10-07 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772820#comment-17772820
 ] 

xiaogang zhou commented on FLINK-33174:
---

[~libenchao] Hi Benchao, Would you please assign the issue to me? 

We can see whether to wait for the calcite bumping or find someway else

> enabling tablesample bernoulli in flink
> ---
>
> Key: FLINK-33174
> URL: https://issues.apache.org/jira/browse/FLINK-33174
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce a table sample function to enable fast sampling to 
> streamings. 
> this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33174) enabling tablesample bernoulli in flink

2023-10-01 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770822#comment-17770822
 ] 

xiaogang zhou commented on FLINK-33174:
---

[~lsy] [~libenchao] Hi Bros, would you please help review the suggestion?

> enabling tablesample bernoulli in flink
> ---
>
> Key: FLINK-33174
> URL: https://issues.apache.org/jira/browse/FLINK-33174
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.17.1
>Reporter: xiaogang zhou
>Priority: Major
>
> I'd like to introduce a table sample function to enable fast sampling to 
> streamings. 
> this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33174) enabling tablesample bernoulli in flink

2023-10-01 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33174:
-

 Summary: enabling tablesample bernoulli in flink
 Key: FLINK-33174
 URL: https://issues.apache.org/jira/browse/FLINK-33174
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.17.1
Reporter: xiaogang zhou


I'd like to introduce a table sample function to enable fast sampling to 
streamings. 

this is enlighted by https://issues.apache.org/jira/browse/CALCITE-5971



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

2023-09-27 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33162:
--
Description: 
when starting a job with large number of taskmanagers, the jobmanager of the 
job failed to respond to and rest request. when look into the jstack we found 
all the 4 threads are server metrics fetcher.
{code:java}
// code placeholder
"Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
tid=0x7f17e7823000 nid=0x246 waiting for monitor entry [0x7f178e9fe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- <0x0003ce80d8f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
tid=0x7f17e88af000 nid=0x243 waiting for monitor entry [0x7f1790dfe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- <0x0003ce80df88> (a 
java.util.concurrent.ThreadPoolExecutor$Worker) 

"Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000]   
java.lang.Thread.State: RUNNABLE   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
 - locked <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)  at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 

[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

2023-09-27 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33162:
--
Affects Version/s: 1.13.1
   (was: 1.16.0)

> seperate the executor in DefaultDispatcherResourceManagerComponentFactory for 
> MetricFetcher and webMonitorEndpoint
> --
>
> Key: FLINK-33162
> URL: https://issues.apache.org/jira/browse/FLINK-33162
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Affects Versions: 1.13.1
>Reporter: xiaogang zhou
>Priority: Major
> Fix For: 1.19.0
>
>
> when starting a job with large number of taskmanagers, the jobmanager of the 
> job failed to respond to and rest request. when look into the jstack we found 
> all the 4 threads are server metrics fetcher.
> {code:java}
> // code placeholder
> "Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
> tid=0x7f17e7823000 nid=0x246 waiting for monitor entry 
> [0x7f178e9fe000]   java.lang.Thread.State: BLOCKED (on object monitor)
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x0003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:  - <0x0003ce80d8f0> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker)
> "Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
> tid=0x7f17e88af000 nid=0x243 waiting for monitor entry 
> [0x7f1790dfe000]   java.lang.Thread.State: BLOCKED (on object monitor)
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
>  - waiting to lock <0x0003d5f62638> (a 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
>  at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
>  Source) at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>    Locked ownable synchronizers:  - <0x0003ce80df88> (a 
> java.util.concurrent.ThreadPoolExecutor$Worker) 
> "Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
> tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000]   
> java.lang.Thread.State: RUNNABLE at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
>at 
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
>  - locked <0x0003d5f62638> (a 
> 

[jira] [Updated] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

2023-09-27 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33162:
--
Description: 
when starting a job with large number of taskmanagers, the jobmanager of the 
job failed to respond to and rest request. when look into the jstack we found 
all the 4 threads are server metrics fetcher.
{code:java}
// code placeholder
"Flink-DispatcherRestEndpoint-thread-4" #91 daemon prio=5 os_prio=0 
tid=0x7f17e7823000 nid=0x246 waiting for monitor entry [0x7f178e9fe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- <0x0003ce80d8f0> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)

"Flink-DispatcherRestEndpoint-thread-3" #88 daemon prio=5 os_prio=0 
tid=0x7f17e88af000 nid=0x243 waiting for monitor entry [0x7f1790dfe000] 
  java.lang.Thread.State: BLOCKED (on object monitor)   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:81)
 - waiting to lock <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore) at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)
   Locked ownable synchronizers:- <0x0003ce80df88> (a 
java.util.concurrent.ThreadPoolExecutor$Worker) 

"Flink-DispatcherRestEndpoint-thread-2" #79 daemon prio=5 os_prio=0 
tid=0x7f1793473800 nid=0x23a runnable [0x7f17922fd000]   
java.lang.Thread.State: RUNNABLE   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.add(MetricStore.java:216)
   at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore.addAll(MetricStore.java:82)
 - locked <0x0003d5f62638> (a 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore)  at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl.lambda$queryMetrics$5(MetricFetcherImpl.java:244)
 at 
org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl$$Lambda$1590/569188012.accept(Unknown
 Source) at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)  
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 

[jira] [Created] (FLINK-33162) seperate the executor in DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and webMonitorEndpoint

2023-09-27 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33162:
-

 Summary: seperate the executor in 
DefaultDispatcherResourceManagerComponentFactory for MetricFetcher and 
webMonitorEndpoint
 Key: FLINK-33162
 URL: https://issues.apache.org/jira/browse/FLINK-33162
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Affects Versions: 1.16.0
Reporter: xiaogang zhou
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762248#comment-17762248
 ] 

xiaogang zhou edited comment on FLINK-33038 at 9/6/23 2:26 AM:
---

[~Sergey Nuyanzin]  fixed, would you please have a quick review?


was (Author: zhoujira86):
[~Sergey Nuyanzin] [https://github.com/apache/flink/pull/23360] fixed, would 
you please have a quick review?

> remove getMinRetentionTime in StreamExecDeduplicate
> ---
>
> Key: FLINK-33038
> URL: https://issues.apache.org/jira/browse/FLINK-33038
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Minor
> Fix For: 1.19.0
>
>
> I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate 
> as it is not called by anyone and the ttl is controlled by the state meta 
> data.
>  
> Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762248#comment-17762248
 ] 

xiaogang zhou commented on FLINK-33038:
---

[~Sergey Nuyanzin] [https://github.com/apache/flink/pull/23360] fixed, would 
you please have a quick review?

> remove getMinRetentionTime in StreamExecDeduplicate
> ---
>
> Key: FLINK-33038
> URL: https://issues.apache.org/jira/browse/FLINK-33038
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Minor
> Fix For: 1.19.0
>
>
> I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate 
> as it is not called by anyone and the ttl is controlled by the state meta 
> data.
>  
> Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762032#comment-17762032
 ] 

xiaogang zhou commented on FLINK-33038:
---

[~snuyanzin] would you please help assign to me?

also cc the modifier [~qingyue] 

> remove getMinRetentionTime in StreamExecDeduplicate
> ---
>
> Key: FLINK-33038
> URL: https://issues.apache.org/jira/browse/FLINK-33038
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: xiaogang zhou
>Priority: Minor
> Fix For: 1.19.0
>
>
> I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate 
> as it is not called by anyone and the ttl is controlled by the state meta 
> data.
>  
> Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-33038:
--
Description: 
I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate as 
it is not called by anyone and the ttl is controlled by the state meta data.

 

Please let me take the issue if possible

  was:
I suggest to remove the 

StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled 
by the state meta data.

 

Please let me take the issue if possible


> remove getMinRetentionTime in StreamExecDeduplicate
> ---
>
> Key: FLINK-33038
> URL: https://issues.apache.org/jira/browse/FLINK-33038
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: xiaogang zhou
>Priority: Minor
> Fix For: 1.19.0
>
>
> I suggest to remove the getMinRetentionTime method in StreamExecDeduplicate 
> as it is not called by anyone and the ttl is controlled by the state meta 
> data.
>  
> Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-33038) remove getMinRetentionTime in StreamExecDeduplicate

2023-09-05 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-33038:
-

 Summary: remove getMinRetentionTime in StreamExecDeduplicate
 Key: FLINK-33038
 URL: https://issues.apache.org/jira/browse/FLINK-33038
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.18.0
Reporter: xiaogang zhou
 Fix For: 1.19.0


I suggest to remove the 

StreamExecDeduplicate method in StreamExecDeduplicate as the ttl is controlled 
by the state meta data.

 

Please let me take the issue if possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-22059) add a new option is rocksdb statebackend to enable job threads setting

2023-09-03 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17761587#comment-17761587
 ] 

xiaogang zhou commented on FLINK-22059:
---

[~Zakelly] yes

> add a new option is rocksdb statebackend to enable job threads setting
> --
>
> Key: FLINK-22059
> URL: https://issues.apache.org/jira/browse/FLINK-22059
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.12.2
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Minor
>  Labels: auto-deprioritized-major, auto-unassigned, stale-assigned
> Fix For: 1.18.0
>
>
> As discussed in FLINK-21688 , now we are using the setIncreaseParallelism 
> function to set the number of rocksdb's working threads. 
>  
> can we enable another setting key to set the rocksdb's max backgroud jobs 
> which will set a large flush thread number.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32765) create view should reuse calcite tree

2023-08-07 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32765:
--
Description: 
{code:java}
// code placeholder

CREATE TABLE source (
    k2 bigint,
    o1 bigint
) 
COMMENT 'source'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

CREATE TABLE print_table2 (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

create view v1 as 
  select k2*2 as k2 from source where o1 > 20;


insert into print_table select k2 from v1 where k2 >9;
insert into print_table2 select k2 from v1 where k2 <80; {code}
This SQL will cause view v1 logic being created for 2 times.  Why can't us 
create a table in executeInternal

and keep a QueryOperationCatalogView in CatalogManager?
{code:java}
// code placeholder

public TableResult executeInternal(String statement) 
...
else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
Table table = 
sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
 

 

this could enable us to avoid codegen for multiple time if we could reuse the 
some part of the query

 

  was:
{code:java}
// code placeholder

CREATE TABLE source (
    k2 bigint,
    o1 bigint
) 
COMMENT 'source'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

CREATE TABLE print_table2 (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

create view v1 as 
  select k2*2 as k2 from source where o1 > 20;


insert into print_table select k2 from v1 where k2 >9;
insert into print_table2 select k2 from v1 where k2 <80; {code}
This SQL will cause view v1 logic being created for 2 times.  Why can't us 
create a table in executeSql and keep a QueryOperationCatalogView in 
CatalogManager?
{code:java}
// code placeholder

public TableResult executeSql(String statement) 
...
else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
Table table = 
sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
 

 

this could enable us to avoid codegen for multiple time if we could reuse the 
some part of the query

 


> create view should reuse calcite tree
> -
>
> Key: FLINK-32765
> URL: https://issues.apache.org/jira/browse/FLINK-32765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     k2 bigint,
>     o1 bigint
> ) 
> COMMENT 'source'
> WITH (
>   'connector' = 'datagen'
> );
> CREATE TABLE print_table (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> CREATE TABLE print_table2 (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> create view v1 as 
>   select k2*2 as k2 from source where o1 > 20;
> insert into print_table select k2 from v1 where k2 >9;
> insert into print_table2 select k2 from v1 where k2 <80; {code}
> This SQL will cause view v1 logic being created for 2 times.  Why can't us 
> create a table in executeInternal
> and keep a QueryOperationCatalogView in CatalogManager?
> {code:java}
> // code placeholder
> public TableResult executeInternal(String statement) 
> ...
> else if (operation instanceof CreateViewOperation) {
> CreateViewOperation createViewOperation = (CreateViewOperation) operation;
> Table table = 
> sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
>  
>  
> this could enable us to avoid codegen for multiple time if we could reuse the 
> some part of the query
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32765) create view should reuse calcite tree

2023-08-07 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32765:
--
Description: 
{code:java}
// code placeholder

CREATE TABLE source (
    k2 bigint,
    o1 bigint
) 
COMMENT 'source'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

CREATE TABLE print_table2 (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

create view v1 as 
  select k2*2 as k2 from source where o1 > 20;


insert into print_table select k2 from v1 where k2 >9;
insert into print_table2 select k2 from v1 where k2 <80; {code}
This SQL will cause view v1 logic being created for 2 times.  Why can't us 
create a table in executeInternal and keep a QueryOperationCatalogView in 
CatalogManager?
{code:java}
// code placeholder

public TableResult executeInternal(String statement) 
...
else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
Table table = 
sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
 

 

this could enable us to avoid codegen for multiple time if we could reuse the 
some part of the query

 

  was:
{code:java}
// code placeholder

CREATE TABLE source (
    k2 bigint,
    o1 bigint
) 
COMMENT 'source'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

CREATE TABLE print_table2 (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

create view v1 as 
  select k2*2 as k2 from source where o1 > 20;


insert into print_table select k2 from v1 where k2 >9;
insert into print_table2 select k2 from v1 where k2 <80; {code}
This SQL will cause view v1 logic being created for 2 times.  Why can't us 
create a table in executeInternal

and keep a QueryOperationCatalogView in CatalogManager?
{code:java}
// code placeholder

public TableResult executeInternal(String statement) 
...
else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
Table table = 
sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
 

 

this could enable us to avoid codegen for multiple time if we could reuse the 
some part of the query

 


> create view should reuse calcite tree
> -
>
> Key: FLINK-32765
> URL: https://issues.apache.org/jira/browse/FLINK-32765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     k2 bigint,
>     o1 bigint
> ) 
> COMMENT 'source'
> WITH (
>   'connector' = 'datagen'
> );
> CREATE TABLE print_table (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> CREATE TABLE print_table2 (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> create view v1 as 
>   select k2*2 as k2 from source where o1 > 20;
> insert into print_table select k2 from v1 where k2 >9;
> insert into print_table2 select k2 from v1 where k2 <80; {code}
> This SQL will cause view v1 logic being created for 2 times.  Why can't us 
> create a table in executeInternal and keep a QueryOperationCatalogView in 
> CatalogManager?
> {code:java}
> // code placeholder
> public TableResult executeInternal(String statement) 
> ...
> else if (operation instanceof CreateViewOperation) {
> CreateViewOperation createViewOperation = (CreateViewOperation) operation;
> Table table = 
> sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
>  
>  
> this could enable us to avoid codegen for multiple time if we could reuse the 
> some part of the query
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32765) create view should reuse calcite tree

2023-08-07 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32765:
--
Description: 
{code:java}
// code placeholder

CREATE TABLE source (
    k2 bigint,
    o1 bigint
) 
COMMENT 'source'
WITH (
  'connector' = 'datagen'
);

CREATE TABLE print_table (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

CREATE TABLE print_table2 (
    k1 bigint
) WITH (
  'connector' = 'blackhole'
);

create view v1 as 
  select k2*2 as k2 from source where o1 > 20;


insert into print_table select k2 from v1 where k2 >9;
insert into print_table2 select k2 from v1 where k2 <80; {code}
This SQL will cause view v1 logic being created for 2 times.  Why can't us 
create a table in executeSql and keep a QueryOperationCatalogView in 
CatalogManager?
{code:java}
// code placeholder

public TableResult executeSql(String statement) 
...
else if (operation instanceof CreateViewOperation) {
CreateViewOperation createViewOperation = (CreateViewOperation) operation;
Table table = 
sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
 

 

this could enable us to avoid codegen for multiple time if we could reuse the 
some part of the query

 

> create view should reuse calcite tree
> -
>
> Key: FLINK-32765
> URL: https://issues.apache.org/jira/browse/FLINK-32765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> CREATE TABLE source (
>     k2 bigint,
>     o1 bigint
> ) 
> COMMENT 'source'
> WITH (
>   'connector' = 'datagen'
> );
> CREATE TABLE print_table (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> CREATE TABLE print_table2 (
>     k1 bigint
> ) WITH (
>   'connector' = 'blackhole'
> );
> create view v1 as 
>   select k2*2 as k2 from source where o1 > 20;
> insert into print_table select k2 from v1 where k2 >9;
> insert into print_table2 select k2 from v1 where k2 <80; {code}
> This SQL will cause view v1 logic being created for 2 times.  Why can't us 
> create a table in executeSql and keep a QueryOperationCatalogView in 
> CatalogManager?
> {code:java}
> // code placeholder
> public TableResult executeSql(String statement) 
> ...
> else if (operation instanceof CreateViewOperation) {
> CreateViewOperation createViewOperation = (CreateViewOperation) operation;
> Table table = 
> sqlQuery(createViewOperation.getCatalogView().getOriginalQuery()); {code}
>  
>  
> this could enable us to avoid codegen for multiple time if we could reuse the 
> some part of the query
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32765) create view should reuse calcite tree

2023-08-06 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32765:
--
Component/s: Table SQL / API
 (was: Table SQL / Planner)

> create view should reuse calcite tree
> -
>
> Key: FLINK-32765
> URL: https://issues.apache.org/jira/browse/FLINK-32765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32765) create view should reuse calcite tree

2023-08-06 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32765:
-

 Summary: create view should reuse calcite tree
 Key: FLINK-32765
 URL: https://issues.apache.org/jira/browse/FLINK-32765
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750532#comment-17750532
 ] 

xiaogang zhou commented on FLINK-32732:
---

[~renqs] The main defect of the current implement is that if the earliest 
records are timed out very quickly, the task can always be restarting when it 
found the offset not found when it startup

> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?
>  
> I think after consuming kafka records from earliest to latest, the 
> scan.startup.mode should no longer influence the kafka scan behave. So I 
> suggest change the override to false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750183#comment-17750183
 ] 

xiaogang zhou edited comment on FLINK-32732 at 8/3/23 2:30 AM:
---

[~renqs]  let me know your thinking please


was (Author: zhoujira86):
[~libenchao] [~renqs]  let me know your thinking please

> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?
>  
> I think after consuming kafka records from earliest to latest, the 
> scan.startup.mode should no longer influence the kafka scan behave. So I 
> suggest change the override to false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32738) PROTOBUF format supports projection push down

2023-08-02 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750527#comment-17750527
 ] 

xiaogang zhou commented on FLINK-32738:
---

[~libenchao] created, please help assign to me master, thanks a lot

> PROTOBUF format supports projection push down
> -
>
> Key: FLINK-32738
> URL: https://issues.apache.org/jira/browse/FLINK-32738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.18.0
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
> Fix For: 1.18.0
>
>
> support projection push down for protobuf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32738) PROTOBUF format supports projection push down

2023-08-02 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32738:
-

 Summary: PROTOBUF format supports projection push down
 Key: FLINK-32738
 URL: https://issues.apache.org/jira/browse/FLINK-32738
 Project: Flink
  Issue Type: Sub-task
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Affects Versions: 1.18.0
Reporter: xiaogang zhou
 Fix For: 1.18.0


support projection push down for protobuf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32607) Kafka table source and json format support projection pushdown

2023-08-02 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750184#comment-17750184
 ] 

xiaogang zhou commented on FLINK-32607:
---

[~libenchao] Not sure if someone has taken the pb projection pushdown. if no , 
Can you please assign it to me?

> Kafka table source and json format support projection pushdown
> --
>
> Key: FLINK-32607
> URL: https://issues.apache.org/jira/browse/FLINK-32607
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.18.0
>
>
> ProjectionPushDown has a huge performance impact and is not currently 
> implemented in Kafka Source, so we can support it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17750183#comment-17750183
 ] 

xiaogang zhou commented on FLINK-32732:
---

[~libenchao] [~renqs]  let me know your thinking please

> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?
>  
> I think after consuming kafka records from earliest to latest, the 
> scan.startup.mode should no longer influence the kafka scan behave. So I 
> suggest change the override to false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32732:
--
Description: 
{code:java}
// code placeholder
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,

startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
true); {code}
now flink override the auto.offset.reset with the scan.startup.mode config, and 
user's explicit config does not take effect. I think maybe we should expose 
this to customer?

 

I think after consuming kafka records from earliest to latest, the 
scan.startup.mode should no longer influence the kafka scan behave. So I 
suggest change the override to false.

  was:
{code:java}
// code placeholder
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,

startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
true); {code}
now flink override the auto.offset.reset with the scan.startup.mode config, and 
user's explicit config does not take effect. I think maybe we should expose 
this to customer?


> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?
>  
> I think after consuming kafka records from earliest to latest, the 
> scan.startup.mode should no longer influence the kafka scan behave. So I 
> suggest change the override to false.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32732:
--
Description: 
{code:java}
// code placeholder
maybeOverride(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,

startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
true); {code}
now flink override the auto.offset.reset with the scan.startup.mode config, and 
user's explicit config does not take effect. I think maybe we should expose 
this to customer?

> auto offset reset should be exposed to user
> ---
>
> Key: FLINK-32732
> URL: https://issues.apache.org/jira/browse/FLINK-32732
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> {code:java}
> // code placeholder
> maybeOverride(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> 
> startingOffsetsInitializer.getAutoOffsetResetStrategy().name().toLowerCase(),
> true); {code}
> now flink override the auto.offset.reset with the scan.startup.mode config, 
> and user's explicit config does not take effect. I think maybe we should 
> expose this to customer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32732) auto offset reset should be exposed to user

2023-08-02 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32732:
-

 Summary: auto offset reset should be exposed to user
 Key: FLINK-32732
 URL: https://issues.apache.org/jira/browse/FLINK-32732
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled

2023-05-25 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17726445#comment-17726445
 ] 

xiaogang zhou commented on FLINK-32132:
---

[~luoyuxia] Can you please help review

> Cast function CODEGEN does not work as expected when nullOnFailure enabled
> --
>
> Key: FLINK-32132
> URL: https://issues.apache.org/jira/browse/FLINK-32132
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Assignee: xiaogang zhou
>Priority: Major
>  Labels: pull-request-available
>
> I am trying to generate code cast string to bigint, and got generated code 
> like:
>  
>  
> {code:java}
> // code placeholder
> if (!isNull$14) {
> result$15 = 
> org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim());
> } else {
> result$15 = -1L;
> }
>castRuleResult$16 = result$15;
>castRuleResultIsNull$17 = isNull$14;
>  } catch (java.lang.Throwable e) {
>castRuleResult$16 = -1L;
>castRuleResultIsNull$17 = true;
>  }
>  // --- End cast section
> out.setLong(0, castRuleResult$16); {code}
> such kind of handle does not provide a perfect solution as the default value 
> of long is set to -1L, which can be meaningful in some case. And can cause 
> some calculation error.
>  
> And I understand the cast returns a bigint not null, But since there is a 
> exception, we should ignore the type restriction, so I suggest to modify the 
> CodeGenUtils.rowSetField like below:
>  
> {code:java}
> // code placeholder
> if (fieldType.isNullable || 
> fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) {
>   s"""
>  |${fieldExpr.code}
>  |if (${fieldExpr.nullTerm}) {
>  |  $setNullField;
>  |} else {
>  |  $writeField;
>  |}
> """.stripMargin
> } else {
>   s"""
>  |${fieldExpr.code}
>  |$writeField;
>""".stripMargin
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-32115) json_value support cache

2023-05-23 Thread xiaogang zhou (Jira)


[ https://issues.apache.org/jira/browse/FLINK-32115 ]


xiaogang zhou deleted comment on FLINK-32115:
---

was (Author: zhoujira86):
[~luoyuxia] Hi yuxia, can you please help review this?

> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> +underlined 
> text+[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
> I added a static LRU cache into SqlJsonUtils, and refactor the 
> jsonValueExpression1 like 
> {code:java}
> private static JsonValueContext jsonValueExpression1(String input) {
> JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
> if (parsedJsonContext != null) {
> return parsedJsonContext;
> }
> try {
> parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
> } catch (Exception e) {
> parsedJsonContext = JsonValueContext.withException(e);
> }
> EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
> return parsedJsonContext;
> } {code}
>  
> and benchmarked like:
> {code:java}
> public static void main(String[] args) {
> String input = 
> "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;
> Long start = System.currentTimeMillis();
> for (int i = 0; i < 100; i++) {
> Object dejsonize = jsonValueExpression1(input);
> }
> System.err.println(System.currentTimeMillis() - start);
> } {code}
>  
> time 2 benchmark takes is:
> ||case||milli second taken||
> |cache|33|
> |no cache|1591|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32115) json_value support cache

2023-05-22 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724801#comment-17724801
 ] 

xiaogang zhou commented on FLINK-32115:
---

[~luoyuxia] Hi yuxia, can you please help review this?

> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
> I added a static LRU cache into SqlJsonUtils, and refactor the 
> jsonValueExpression1 like 
> {code:java}
> private static JsonValueContext jsonValueExpression1(String input) {
> JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
> if (parsedJsonContext != null) {
> return parsedJsonContext;
> }
> try {
> parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
> } catch (Exception e) {
> parsedJsonContext = JsonValueContext.withException(e);
> }
> EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
> return parsedJsonContext;
> } {code}
>  
> and benchmarked like:
> {code:java}
> public static void main(String[] args) {
> String input = 
> "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;
> Long start = System.currentTimeMillis();
> for (int i = 0; i < 100; i++) {
> Object dejsonize = jsonValueExpression1(input);
> }
> System.err.println(System.currentTimeMillis() - start);
> } {code}
>  
> time 2 benchmark takes is:
> ||case||milli second taken||
> |cache|33|
> |no cache|1591|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32115) json_value support cache

2023-05-22 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32115:
--
Description: 
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

 

I added a static LRU cache into SqlJsonUtils, and refactor the 
jsonValueExpression1 like 
{code:java}
private static JsonValueContext jsonValueExpression1(String input) {
JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
if (parsedJsonContext != null) {
return parsedJsonContext;
}
try {
parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
} catch (Exception e) {
parsedJsonContext = JsonValueContext.withException(e);
}

EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
return parsedJsonContext;
} {code}
 

and benchmarked like:
{code:java}
public static void main(String[] args) {
String input = 
"{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;

Long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Object dejsonize = jsonValueExpression1(input);
}
System.err.println(System.currentTimeMillis() - start);

} {code}
 

time 2 benchmark takes is:
||case||milli second taken||
|cache|33|
|no cache|1591|

 

  was:
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

 

I added a static LRU cache into SqlJsonUtils, and refactor the 
jsonValueExpression1 like 
{code:java}
private static JsonValueContext jsonValueExpression1(String input) {
JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
if (parsedJsonContext != null) {
return parsedJsonContext;
}
try {
parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
} catch (Exception e) {
parsedJsonContext = JsonValueContext.withException(e);
}

EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
return parsedJsonContext;
} {code}
 

and benchmarked like:
{code:java}
public static void main(String[] args) {
String input = 
"{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;

Long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Object dejsonize = jsonValueExpression1(input);
}
System.err.println(System.currentTimeMillis() - start);

} {code}
 

time 2 benchmark takes is:
||case||milli second taken||
|cache|33|
|no cache|1591|

 

 

I 


> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
> I added a static LRU cache into SqlJsonUtils, and refactor the 
> jsonValueExpression1 like 
> {code:java}
> private static JsonValueContext jsonValueExpression1(String input) {
> JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
> if (parsedJsonContext != null) {
> return parsedJsonContext;
> }
> try {
> parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
> } catch (Exception e) {
> parsedJsonContext = JsonValueContext.withException(e);
> }
> EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
> return parsedJsonContext;
> } {code}
>  
> and benchmarked like:
> {code:java}
> public static void main(String[] args) {
> String input = 
> "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;
> Long start = System.currentTimeMillis();
> for (int i = 0; i < 100; i++) {
> Object dejsonize = 

[jira] [Updated] (FLINK-32115) json_value support cache

2023-05-22 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32115:
--
Description: 
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

 

I added a static LRU cache into SqlJsonUtils, and refactor the 
jsonValueExpression1 like 
{code:java}
private static JsonValueContext jsonValueExpression1(String input) {
JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
if (parsedJsonContext != null) {
return parsedJsonContext;
}
try {
parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
} catch (Exception e) {
parsedJsonContext = JsonValueContext.withException(e);
}

EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
return parsedJsonContext;
} {code}
 

and benchmarked like:
{code:java}
public static void main(String[] args) {
String input = 
"{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;

Long start = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
Object dejsonize = jsonValueExpression1(input);
}
System.err.println(System.currentTimeMillis() - start);

} {code}
 

time 2 benchmark takes is:
||case||milli second taken||
|cache|33|
|no cache|1591|

 

 

I 

  was:
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

 

 

I have tested it with SQL like (keys are replaced for security reason)

 

insert into blackhole 
select

   JSON_VALUE(`message`,'$.a') as scene,
   JSON_VALUE(`message`,'$.b') as screen_height,
   JSON_VALUE(`message`,'$.c') as longitude,
   JSON_VALUE(`message`,'$.d') as device_id,
   JSON_VALUE(`message`,'$.e') as receive_time,
   JSON_VALUE(`message`,'$.f') as app_build,
   JSON_VALUE(`message`,'$.g') as track_id,
   JSON_VALUE(`message`,'$.h') as distinct_id
from xxx;

 

the cached UDF is about 2 times the speed the nocache UDF do.


> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
> I added a static LRU cache into SqlJsonUtils, and refactor the 
> jsonValueExpression1 like 
> {code:java}
> private static JsonValueContext jsonValueExpression1(String input) {
> JsonValueContext parsedJsonContext = EXTRACT_OBJECT_CACHE.get(input);
> if (parsedJsonContext != null) {
> return parsedJsonContext;
> }
> try {
> parsedJsonContext = JsonValueContext.withJavaObj(dejsonize(input));
> } catch (Exception e) {
> parsedJsonContext = JsonValueContext.withException(e);
> }
> EXTRACT_OBJECT_CACHE.put(input, parsedJsonContext);
> return parsedJsonContext;
> } {code}
>  
> and benchmarked like:
> {code:java}
> public static void main(String[] args) {
> String input = 
> "{\"social\":[{\"weibo\":\"https://weibo.com/xiaoming\"},{\"github\":\"https://github.com/xiaoming\"}]};;
> Long start = System.currentTimeMillis();
> for (int i = 0; i < 100; i++) {
> Object dejsonize = jsonValueExpression1(input);
> }
> System.err.println(System.currentTimeMillis() - start);
> } {code}
>  
> time 2 benchmark takes is:
> ||case||milli second taken||
> |cache|33|
> |no cache|1591|
>  
>  
> I 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled

2023-05-22 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724780#comment-17724780
 ] 

xiaogang zhou commented on FLINK-32132:
---

[~luoyuxia] yes, please assign to me

> Cast function CODEGEN does not work as expected when nullOnFailure enabled
> --
>
> Key: FLINK-32132
> URL: https://issues.apache.org/jira/browse/FLINK-32132
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> I am trying to generate code cast string to bigint, and got generated code 
> like:
>  
>  
> {code:java}
> // code placeholder
> if (!isNull$14) {
> result$15 = 
> org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim());
> } else {
> result$15 = -1L;
> }
>castRuleResult$16 = result$15;
>castRuleResultIsNull$17 = isNull$14;
>  } catch (java.lang.Throwable e) {
>castRuleResult$16 = -1L;
>castRuleResultIsNull$17 = true;
>  }
>  // --- End cast section
> out.setLong(0, castRuleResult$16); {code}
> such kind of handle does not provide a perfect solution as the default value 
> of long is set to -1L, which can be meaningful in some case. And can cause 
> some calculation error.
>  
> And I understand the cast returns a bigint not null, But since there is a 
> exception, we should ignore the type restriction, so I suggest to modify the 
> CodeGenUtils.rowSetField like below:
>  
> {code:java}
> // code placeholder
> if (fieldType.isNullable || 
> fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) {
>   s"""
>  |${fieldExpr.code}
>  |if (${fieldExpr.nullTerm}) {
>  |  $setNullField;
>  |} else {
>  |  $writeField;
>  |}
> """.stripMargin
> } else {
>   s"""
>  |${fieldExpr.code}
>  |$writeField;
>""".stripMargin
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled

2023-05-19 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32132:
--
Description: 
I am trying to generate code cast string to bigint, and got generated code like:

 

 
{code:java}
// code placeholder

if (!isNull$14) {
result$15 = 
org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim());
} else {
result$15 = -1L;
}

   castRuleResult$16 = result$15;
   castRuleResultIsNull$17 = isNull$14;
 } catch (java.lang.Throwable e) {
   castRuleResult$16 = -1L;
   castRuleResultIsNull$17 = true;
 }
 // --- End cast section

out.setLong(0, castRuleResult$16); {code}
such kind of handle does not provide a perfect solution as the default value of 
long is set to -1L, which can be meaningful in some case. And can cause some 
calculation error.
 
And I understand the cast returns a bigint not null, But since there is a 
exception, we should ignore the type restriction, so I suggest to modify the 
CodeGenUtils.rowSetField like below:
 
{code:java}
// code placeholder

if (fieldType.isNullable || 
fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) {
  s"""
 |${fieldExpr.code}
 |if (${fieldExpr.nullTerm}) {
 |  $setNullField;
 |} else {
 |  $writeField;
 |}
""".stripMargin
} else {
  s"""
 |${fieldExpr.code}
 |$writeField;
   """.stripMargin
} {code}

> Cast function CODEGEN does not work as expected when nullOnFailure enabled
> --
>
> Key: FLINK-32132
> URL: https://issues.apache.org/jira/browse/FLINK-32132
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> I am trying to generate code cast string to bigint, and got generated code 
> like:
>  
>  
> {code:java}
> // code placeholder
> if (!isNull$14) {
> result$15 = 
> org.apache.flink.table.data.binary.BinaryStringDataUtil.toLong(field$13.trim());
> } else {
> result$15 = -1L;
> }
>castRuleResult$16 = result$15;
>castRuleResultIsNull$17 = isNull$14;
>  } catch (java.lang.Throwable e) {
>castRuleResult$16 = -1L;
>castRuleResultIsNull$17 = true;
>  }
>  // --- End cast section
> out.setLong(0, castRuleResult$16); {code}
> such kind of handle does not provide a perfect solution as the default value 
> of long is set to -1L, which can be meaningful in some case. And can cause 
> some calculation error.
>  
> And I understand the cast returns a bigint not null, But since there is a 
> exception, we should ignore the type restriction, so I suggest to modify the 
> CodeGenUtils.rowSetField like below:
>  
> {code:java}
> // code placeholder
> if (fieldType.isNullable || 
> fieldExpr.nullTerm.startsWith("castRuleResultIsNull")) {
>   s"""
>  |${fieldExpr.code}
>  |if (${fieldExpr.nullTerm}) {
>  |  $setNullField;
>  |} else {
>  |  $writeField;
>  |}
> """.stripMargin
> } else {
>   s"""
>  |${fieldExpr.code}
>  |$writeField;
>""".stripMargin
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32132) Cast function CODEGEN does not work as expected when nullOnFailure enabled

2023-05-19 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32132:
-

 Summary: Cast function CODEGEN does not work as expected when 
nullOnFailure enabled
 Key: FLINK-32132
 URL: https://issues.apache.org/jira/browse/FLINK-32132
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32115) json_value support cache

2023-05-17 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32115:
--
Description: 
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

 

 

I have tested it with SQL like (keys are replaced for security reason)

 

insert into blackhole 
select

   JSON_VALUE(`message`,'$.a') as scene,
   JSON_VALUE(`message`,'$.b') as screen_height,
   JSON_VALUE(`message`,'$.c') as longitude,
   JSON_VALUE(`message`,'$.d') as device_id,
   JSON_VALUE(`message`,'$.e') as receive_time,
   JSON_VALUE(`message`,'$.f') as app_build,
   JSON_VALUE(`message`,'$.g') as track_id,
   JSON_VALUE(`message`,'$.h') as distinct_id
from xxx;

 

the cached UDF is about 2 times the speed the nocache UDF do.

  was:
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot


> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot
>  
>  
> I have tested it with SQL like (keys are replaced for security reason)
>  
> insert into blackhole 
> select
>    JSON_VALUE(`message`,'$.a') as scene,
>    JSON_VALUE(`message`,'$.b') as screen_height,
>    JSON_VALUE(`message`,'$.c') as longitude,
>    JSON_VALUE(`message`,'$.d') as device_id,
>    JSON_VALUE(`message`,'$.e') as receive_time,
>    JSON_VALUE(`message`,'$.f') as app_build,
>    JSON_VALUE(`message`,'$.g') as track_id,
>    JSON_VALUE(`message`,'$.h') as distinct_id
> from xxx;
>  
> the cached UDF is about 2 times the speed the nocache UDF do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32115) json_value support cache

2023-05-16 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-32115:
--
Description: 
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 

 

This optimize can improve performance of SQL like

 

select 

json_value(A, 'xxx'),

json_value(A, 'yyy'),

json_value(A, 'zzz'),

...

a lot

  was:
[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 


> json_value support cache
> 
>
> Key: FLINK-32115
> URL: https://issues.apache.org/jira/browse/FLINK-32115
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
>
> [https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]
>  
> hive support json object cache for previous deserialized value, could we 
> consider use a cache objects in JsonValueCallGen? 
>  
> This optimize can improve performance of SQL like
>  
> select 
> json_value(A, 'xxx'),
> json_value(A, 'yyy'),
> json_value(A, 'zzz'),
> ...
> a lot



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32115) json_value support cache

2023-05-16 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-32115:
-

 Summary: json_value support cache
 Key: FLINK-32115
 URL: https://issues.apache.org/jira/browse/FLINK-32115
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Affects Versions: 1.16.1
Reporter: xiaogang zhou


[https://github.com/apache/hive/blob/storage-branch-2.3/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFJSONTuple.java]

 

hive support json object cache for previous deserialized value, could we 
consider use a cache objects in JsonValueCallGen? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-16009) Native support for the Variable-length and Zig-zag Variable-length integers

2023-05-05 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17719745#comment-17719745
 ] 

xiaogang zhou commented on FLINK-16009:
---

[~nkruber] [~gaoyunhaii] 

Hi masters,

I have created a FLIP WIKI

[https://cwiki.apache.org/confluence/display/FLINK/%5BWIP%5DFLIP-310%3Ause+VARINT+and+ZIGZAG+to+encode+ROWDATA+in+state]

 

Can you please help me review and let me know can I take this ticket?

> Native support for the Variable-length and Zig-zag Variable-length integers
> ---
>
> Key: FLINK-16009
> URL: https://issues.apache.org/jira/browse/FLINK-16009
> Project: Flink
>  Issue Type: Improvement
>  Components: API / Type Serialization System
>Reporter: Yun Gao
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> Currently Flink only support fixed-length integers. However, in many cases 
> the values of the integer fields tend to be small, and we could reduce the 
> size of serialized values by using [Variable length 
> encoding|https://developers.google.com/protocol-buffers/docs/encoding#varints]
>  and  [Zig-zag variable-length 
> encoding|https://developers.google.com/protocol-buffers/docs/encoding#signed-integers].
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-27 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693953#comment-17693953
 ] 

xiaogang zhou edited comment on FLINK-31225 at 2/27/23 10:54 AM:
-

[~yunta] Master

I logged onto the container,  do a list and find out:

/tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17_{_}12_50{_}_uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db#
 ll | grep sst
{-}rw-r{-}{-}r{-}- 1 flink flink 30662931 Feb 27 18:20 002512.sst
{-}rw-r{-}{-}r{-}- 1 flink flink  1788364 Feb 27 18:30 002513.sst
{-}rw-r{-}{-}r{-}- 1 flink flink  3209306 Feb 27 18:40 002515.sst
{-}rw-r{-}{-}r{-}- 1 flink flink 13443570 Feb 27 18:40 002517.sst
{-}rw-r{-}{-}r{-}- 1 flink flink  1694438 Feb 27 18:50 002518.sst
{-}rw-r{-}{-}r{-}- 1 flink flink  1509487 Feb 27 18:50 002519.sst

 

I think the LSM tree merge process will delete L0 files.

 

And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some 
one mentioned this. 

 

this is not exactly a leak
but a lot of memory is allocated
but not released
const int table_cache_size = (mutable_db_options_.max_open_files == -1)
? TableCache::kInfiniteCapacity
: mutable_db_options_.max_open_files - 10;
table_cache_ = NewLRUCache(table_cache_size,
immutable_db_options_.table_cache_numshardbits);
all allocated records are stored in this cache
mutable_db_options_.max_open_files is equal 1
so table_cache_size= 4 mb

 

seems this mutable_db_options_.max_open_files  = -1 configuration will save 
TableReader(even if the sst file has already been deleted?) in memory, which 
will cause memory keep growing problem.


was (Author: zhoujira86):
[~yunta] Master

I logged onto the container,  do a list and find out:

/tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17__12_50__uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db#
 ll | grep sst
-rw-r--r-- 1 flink flink 30662931 Feb 27 18:20 002512.sst
-rw-r--r-- 1 flink flink  1788364 Feb 27 18:30 002513.sst
-rw-r--r-- 1 flink flink  3209306 Feb 27 18:40 002515.sst
-rw-r--r-- 1 flink flink 13443570 Feb 27 18:40 002517.sst
-rw-r--r-- 1 flink flink  1694438 Feb 27 18:50 002518.sst
-rw-r--r-- 1 flink flink  1509487 Feb 27 18:50 002519.sst

 

I think the LSM tree merge process will delete L0 files.

 

And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some 
one mentioned this. 

 

this is not exactly a leak
but a lot of memory is allocated
but not released
const int table_cache_size = (mutable_db_options_.max_open_files == -1)
? TableCache::kInfiniteCapacity
: mutable_db_options_.max_open_files - 10;
table_cache_ = NewLRUCache(table_cache_size,
immutable_db_options_.table_cache_numshardbits);
all allocated records are stored in this cache
mutable_db_options_.max_open_files is equal 1
so table_cache_size= 4 mb

 

seems this mutable_db_options_.max_open_files  = -1 configuration will save 
TableReader in memory, which will cause memory keep growing problem.

> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , so this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
> about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
> 1GB. I think the remaining part (8-2.6-1)is occupied by fd .
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-27 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693953#comment-17693953
 ] 

xiaogang zhou commented on FLINK-31225:
---

[~yunta] Master

I logged onto the container,  do a list and find out:

/tmp/flink-io-b7483fcd-50aa-45d2-bd8e-a2b0862c6323/job__op_LegacyKeyedCoProcessOperator_a3bf5557de0062839a60e12819947e17__12_50__uuid_b91b9766-b8af-4de3-9a01-b5dcce29a7bf/db#
 ll | grep sst
-rw-r--r-- 1 flink flink 30662931 Feb 27 18:20 002512.sst
-rw-r--r-- 1 flink flink  1788364 Feb 27 18:30 002513.sst
-rw-r--r-- 1 flink flink  3209306 Feb 27 18:40 002515.sst
-rw-r--r-- 1 flink flink 13443570 Feb 27 18:40 002517.sst
-rw-r--r-- 1 flink flink  1694438 Feb 27 18:50 002518.sst
-rw-r--r-- 1 flink flink  1509487 Feb 27 18:50 002519.sst

 

I think the LSM tree merge process will delete L0 files.

 

And as mentioned above [https://github.com/facebook/rocksdb/issues/4112] . some 
one mentioned this. 

 

this is not exactly a leak
but a lot of memory is allocated
but not released
const int table_cache_size = (mutable_db_options_.max_open_files == -1)
? TableCache::kInfiniteCapacity
: mutable_db_options_.max_open_files - 10;
table_cache_ = NewLRUCache(table_cache_size,
immutable_db_options_.table_cache_numshardbits);
all allocated records are stored in this cache
mutable_db_options_.max_open_files is equal 1
so table_cache_size= 4 mb

 

seems this mutable_db_options_.max_open_files  = -1 configuration will save 
TableReader in memory, which will cause memory keep growing problem.

> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , so this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
> about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
> 1GB. I think the remaining part (8-2.6-1)is occupied by fd .
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-26 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-31225:
--
Description: 
the default value for 

state.backend.rocksdb.files.open

is -1

 

[https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
rocksdb will not close fd , so this can lead to oom issue.

 

also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
leave max open file to -1, right part(2/24 till now) is set the max open file 
to 300. the memory grow is very differnt.

!image-2023-02-26-12-08-49-717.png|width=616,height=285!

 

I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
1GB. I think the remaining part (8-2.6-1)is occupied by fd .

 

I suggest set this to a default value like 500.

 

 

  was:
the default value for 

state.backend.rocksdb.files.open

is -1

 

[https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
rocksdb will not close fd , some this can lead to oom issue.

 

also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
leave max open file to -1, right part(2/24 till now) is set the max open file 
to 300. the memory grow is very differnt.

!image-2023-02-26-12-08-49-717.png|width=616,height=285!

 

I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
1GB. I think the remaining part (8-2.6-1)is occupied by fd .

 

I suggest set this to a default value like 500.

 

 


> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , so this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
> about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
> 1GB. I think the remaining part (8-2.6-1)is occupied by fd .
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-31225:
--
Description: 
the default value for 

state.backend.rocksdb.files.open

is -1

 

[https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
rocksdb will not close fd , some this can lead to oom issue.

 

also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
leave max open file to -1, right part(2/24 till now) is set the max open file 
to 300. the memory grow is very differnt.

!image-2023-02-26-12-08-49-717.png|width=616,height=285!

 

I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
1GB. I think the remaining part (8-2.6-1)is occupied by fd .

 

I suggest set this to a default value like 500.

 

 

  was:
the default value for 

state.backend.rocksdb.files.open

is -1

 

[https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
rocksdb will not close fd , some this can lead to oom issue.

 

also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
leave max open file to -1, right part(2/24 till now) is set the max open file 
to 300. the memory grow is very differnt.

!image-2023-02-26-12-08-49-717.png|width=616,height=285!

 

I suggest set this to a default value like 500.

 

 


> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , some this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I have also attached a jeprof for 2/21-2/24 instance, the tm memory size is 
> about 8GB, heap memory is about 2.6GB, the native part in leak_test is about 
> 1GB. I think the remaining part (8-2.6-1)is occupied by fd .
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-31225:
--
Attachment: leak_test(2).png

> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png, leak_test(2).png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , some this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-31225:
--
Description: 
the default value for 

state.backend.rocksdb.files.open

is -1

 

[https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
rocksdb will not close fd , some this can lead to oom issue.

 

also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
leave max open file to -1, right part(2/24 till now) is set the max open file 
to 300. the memory grow is very differnt.

!image-2023-02-26-12-08-49-717.png|width=616,height=285!

 

I suggest set this to a default value like 500.

 

 

> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png
>
>
> the default value for 
> state.backend.rocksdb.files.open
> is -1
>  
> [https://github.com/facebook/rocksdb/issues/4112] this issue told us the 
> rocksdb will not close fd , some this can lead to oom issue.
>  
> also I can reproduce the situation in my enviroment. left part(2/21- 2/24) is 
> leave max open file to -1, right part(2/24 till now) is set the max open file 
> to 300. the memory grow is very differnt.
> !image-2023-02-26-12-08-49-717.png|width=616,height=285!
>  
> I suggest set this to a default value like 500.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaogang zhou updated FLINK-31225:
--
Attachment: image-2023-02-26-12-08-49-717.png

> rocksdb max open file can lead to oom 
> --
>
> Key: FLINK-31225
> URL: https://issues.apache.org/jira/browse/FLINK-31225
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-26-12-08-49-717.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-25 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693581#comment-17693581
 ] 

xiaogang zhou edited comment on FLINK-31089 at 2/26/23 4:05 AM:


[~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve 
the performance, but disable it will not influence too much. But this is not 
the main topic.

 

My job is a datastream job, my point is to prompt some warning as developer may 
forget to set the stateTtlConfig whereas they turn on the 
PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. 

 

yet I have a new issue can also lead to oom

https://issues.apache.org/jira/browse/FLINK-31225


was (Author: zhoujira86):
[~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve 
the performance, but disable it will not influence too much. But this is not 
the main topic.

 

My job is a datastream job, my point is to prompt some warning as developer may 
forget to set the stateTtlConfig whereas they turn on the 
PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. 

 

 

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-31225) rocksdb max open file can lead to oom

2023-02-25 Thread xiaogang zhou (Jira)
xiaogang zhou created FLINK-31225:
-

 Summary: rocksdb max open file can lead to oom 
 Key: FLINK-31225
 URL: https://issues.apache.org/jira/browse/FLINK-31225
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Affects Versions: 1.16.1
Reporter: xiaogang zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-25 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17693581#comment-17693581
 ] 

xiaogang zhou commented on FLINK-31089:
---

[~Yanfei Lei] yes, your summary is pretty accurate. except pin l0 can improve 
the performance, but disable it will not influence too much. But this is not 
the main topic.

 

My job is a datastream job, my point is to prompt some warning as developer may 
forget to set the stateTtlConfig whereas they turn on the 
PinTopLevelIndexAndFilter. this can 100% lead to some oom issue. 

 

 

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-23 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692566#comment-17692566
 ] 

xiaogang zhou commented on FLINK-31089:
---

some more info:

1, the task with ttl on has been running for long without pinned block cache 
grow

2,we have many task running with 1.13, which means they are without the fix 

https://issues.apache.org/jira/browse/FLINK-22957

 

these task also with the partitioned-index-filters on. They also has oom 
occasionally

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-22 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17692546#comment-17692546
 ] 

xiaogang zhou commented on FLINK-31089:
---

[~yunta] if I turn off the PinTopLevelIndexAndFilter, the task can not run 
correctly as it takes a lot of time load cache. I also found some rank operator 
does not has compaction filter in LOG file

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-20 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690346#comment-17690346
 ] 

xiaogang zhou edited comment on FLINK-31089 at 2/21/23 2:38 AM:


yes, this is not large when start up, but it keeps growing, so no matter how 
large the tm memory is, it will finally oom.

 

now I started up another task with setPinL0FilterAndIndexBlocksInCache true, 
which will have faster growing speed. I will collect another visual profile at 
weekend, will post it here.

 

 


was (Author: zhoujira86):
yes, this is not large when start up, but it keeps growing, so no matter how 
large the tm memory is, it will finally oom.

 

now I started up another task with setPinL0FilterAndIndexBlocksInCache true, 
which will have faster growing speed. I will collect another visual profile at 
weekend, will post it here.

 

And I think it will be convenient to communicate via dingding, I am in a 
ecommerce company in Shanghai in charge of flink. you can send dingding to my 
mail [zhou16...@163.com. Maybe I can get some answer from your side 
:)|mailto:zhou16...@163.com%E3%80%82]

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-20 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691338#comment-17691338
 ] 

xiaogang zhou commented on FLINK-31089:
---

[~yunta] Master, After turn on compaction filter, the pinned block cache size 
stop growing.

 

Sould we add some warning for situation if 'partitioned-index-filters' is on 
and no ttl configured?

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-20 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691153#comment-17691153
 ] 

xiaogang zhou commented on FLINK-31089:
---

I create another task with ttl open, And will keep monitor the memory growth

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-20 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691114#comment-17691114
 ] 

xiaogang zhou edited comment on FLINK-31089 at 2/20/23 10:10 AM:
-

[~yunta] 

 

I found in rocksdb log,

 

2023/02/20-17:55:33.357582 7f4092f42700        Options.compaction_filter: None
2023/02/20-17:55:33.357583 7f4092f42700        
Options.compaction_filter_factory: None

 

could this lead to the index 'oom' issue?


was (Author: zhoujira86):
I found in rocksdb log,

 

2023/02/20-17:55:33.357582 7f4092f42700        Options.compaction_filter: None
2023/02/20-17:55:33.357583 7f4092f42700        
Options.compaction_filter_factory: None

 

could this lead to the index 'oom' issue?

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-20 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17691114#comment-17691114
 ] 

xiaogang zhou commented on FLINK-31089:
---

I found in rocksdb log,

 

2023/02/20-17:55:33.357582 7f4092f42700        Options.compaction_filter: None
2023/02/20-17:55:33.357583 7f4092f42700        
Options.compaction_filter_factory: None

 

could this lead to the index 'oom' issue?

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-31089) pin L0 index in memory can lead to slow memory grow finally lead to memory beyond limit

2023-02-19 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17690999#comment-17690999
 ] 

xiaogang zhou edited comment on FLINK-31089 at 2/20/23 4:56 AM:


[~yunta] 

got some update with l0 pin open, see attache l0pin_open.

 

 

 


was (Author: zhoujira86):
[~yunta] 

got some update with l0 pin open, see attache l0pin_open.

 

I configured the  table.exec.state.ttl to 36hrs. I suspect whether it does not 
change the rocksdb default ttl configuration?

 

> pin L0 index in memory can lead to slow memory grow finally lead to memory 
> beyond limit
> ---
>
> Key: FLINK-31089
> URL: https://issues.apache.org/jira/browse/FLINK-31089
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Affects Versions: 1.16.1
>Reporter: xiaogang zhou
>Priority: Major
> Attachments: image-2023-02-15-20-26-58-604.png, 
> image-2023-02-15-20-32-17-993.png, image-2023-02-17-16-48-59-535.png, 
> l0pin_open.png
>
>
> with the setPinL0FilterAndIndexBlocksInCache true, we can see the pinned 
> memory kept growing(in the pc blow from 48G-> 50G in about 5 hours). But if 
> we switch it to false, we can see the pinned memory stay realtive static. In 
> our environment, a lot of tasks restart due to memory over limit killed by k8s
> !image-2023-02-15-20-26-58-604.png|width=899,height=447!
>  
> !image-2023-02-15-20-32-17-993.png|width=853,height=464!
> the two graphs are recorded in yesterday and today, which means the data 
> stream number per second will not differ alot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >