[GitHub] [flink] flinkbot commented on pull request #21743: [FLINK-15325][coordination] Ignores the input locations of a ConsumePartitionGroup if the corresponding ConsumerVertexGroup is too large

2023-01-20 Thread GitBox


flinkbot commented on PR #21743:
URL: https://github.com/apache/flink/pull/21743#issuecomment-1398746472

   
   ## CI report:
   
   * 9118095bdfdf184466f83661e8b39661c7075181 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk opened a new pull request, #21743: [FLINK-15325][coordination] Ignores the input locations of a ConsumePartitionGroup if the corresponding ConsumerVertexGroup is too large

2023-01-20 Thread GitBox


zhuzhurk opened a new pull request, #21743:
URL: https://github.com/apache/flink/pull/21743

   ## What is the purpose of the change
   
   This change improves the `DefaultPreferredLocationsRetriever` so that it 
ignores the input locations of a ConsumePartitionGroup if the corresponding 
ConsumerVertexGroup is too large. This helps to avoid tasks to be unevenly 
distributed on nodes when running batch jobs or running jobs in 
session/standalone mode, because the consumers in this case will tend to be 
placed on the same node of the input vertex.
   
   ## Brief change log
   
   *(for example:)*
 - Changed `EdgeManagerBuildUtil` to set the 
ConsumedPartitionGroup/ConsumerVertexGroup to its corresponding 
ConsumerVertexGroup/ConsumedPartitionGroup
 - Changed `DefaultPreferredLocationsRetriever` to ignore the input 
locations of a ConsumePartitionGroup if the corresponding ConsumerVertexGroup 
is too large(compared to the ConsumePartitionGroup)
   
   ## Verifying this change
   
 - *Added unit tests in EdgeManagerBuildUtilTest and 
DefaultPreferredLocationsRetrieverTest*
 - *The change is also covered by existing tests of preferred location 
retriever*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (**yes** / no / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] MartijnVisser commented on pull request #18823: [FLINK-20625][pubsub,e2e] Add PubSubSource connector using FLIP-27

2023-01-20 Thread GitBox


MartijnVisser commented on PR #18823:
URL: https://github.com/apache/flink/pull/18823#issuecomment-1398705483

   @RyanSkraba It would indeed be nice if we can move this now to the 
externalized repo. @dchristle it would be great if you can help validate it so 
we can move it forward. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21742: [FLINK-30765][runtime] Aligns the LeaderElectionService.stop() contract

2023-01-20 Thread GitBox


XComp commented on code in PR #21742:
URL: https://github.com/apache/flink/pull/21742#discussion_r1082832944


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -100,6 +100,7 @@ public final void stop() throws Exception {
 if (!running) {
 return;
 }
+leaderContender.revokeLeadership();

Review Comment:
   Ok, digging into a the PR a bit more shows that the old 
[ZooKeeperLeaderElectionService](https://github.com/apache/flink/pull/13644/files?show-deleted-files=true=true%5B%5D=#diff-de2fa2be46667dad62b57be6fefa6135043e19888afd453b634792241785c8e0L147)
 didn't call the revoke. So, I guess, that's just keeping the implementation 
like that.
   
   But just in general from a theoretical standpoint: We do want to 
consistently call revoke during shutdown to trigger the clean up through 
revokeLeadership to stick to the right protocol. That, as a consequence, would 
make cleanup in the `LeaderContender.stop()` implementation obsolete. WDYT? 
:thinking: 
   From what I've seen within the code, any implementation does essentially 
call redundant code cleaning up artifacts in `stop()` and `revokeLeadership()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21742: [FLINK-30765][runtime] Aligns the LeaderElectionService.stop() contract

2023-01-20 Thread GitBox


XComp commented on code in PR #21742:
URL: https://github.com/apache/flink/pull/21742#discussion_r1082832944


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -100,6 +100,7 @@ public final void stop() throws Exception {
 if (!running) {
 return;
 }
+leaderContender.revokeLeadership();

Review Comment:
   Ok, digging into a the PR a bit more shows that the old 
[ZooKeeperLeaderElectionService](https://github.com/apache/flink/pull/13644/files?show-deleted-files=true=true%5B%5D=#diff-de2fa2be46667dad62b57be6fefa6135043e19888afd453b634792241785c8e0L147)
 didn't call the revoke. So, I guess, that's just keeping the implementation 
like that.
   
   But just in general from a theoretical standpoint: We do want to 
consistently call revoke during shutdown to trigger the clean up through 
revokeLeadership to stick to the right protocol. That, as a consequence, would 
make cleanup in the `LeaderContender.stop()` implementation obsolete. WDYT? 
:thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol merged pull request #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


zentol merged PR #21741:
URL: https://github.com/apache/flink/pull/21741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21742: [FLINK-30765][runtime] Aligns the LeaderElectionService.stop() contract

2023-01-20 Thread GitBox


XComp commented on code in PR #21742:
URL: https://github.com/apache/flink/pull/21742#discussion_r1082817072


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -100,6 +100,7 @@ public final void stop() throws Exception {
 if (!running) {
 return;
 }
+leaderContender.revokeLeadership();

Review Comment:
   @wangyang0918 Was there a rationale to omit the revoking of the leadership 
when stopping the service? We even added a dedicated test for that in 
`DefaultLeaderElectionServiceTest`. :thinking: 
   
   To help gettin back the memory: The commit belongs to PR #13644 
(FLINK-19542).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on PR #21737:
URL: https://github.com/apache/flink/pull/21737#issuecomment-1398645766

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082757960


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   Sounds good to me.
   
   Thanks for the suggestion, I will update it asap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


dawidwys commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082743417


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   Makes sense to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


gaborgsomogyi commented on PR #21741:
URL: https://github.com/apache/flink/pull/21741#issuecomment-1398600117

   cc @mbalassi @zentol 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21742: [FLINK-30765][runtime] Aligns the LeaderElectionService.stop() contract

2023-01-20 Thread GitBox


flinkbot commented on PR #21742:
URL: https://github.com/apache/flink/pull/21742#issuecomment-1398477646

   
   ## CI report:
   
   * 9afce3d1052ab18c689e446302a91201c84fadef UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp opened a new pull request, #21742: [FLINK-30765][runtime] Aligns the LeaderElectionService.stop() contract

2023-01-20 Thread GitBox


XComp opened a new pull request, #21742:
URL: https://github.com/apache/flink/pull/21742

   ## What is the purpose of the change
   
   This PR is about hardening the `LeaderElectionService.stop()` contract.
   
   The current implementations of LeaderElectionService do not implement the 
stop() call consistently. Some (e.g. 
[StandaloneLeaderElectionService](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java#L53)
 call revoke on the LeaderContender) whereas others don't (e.g. 
[DefaultLeaderElectionService](https://github.com/apache/flink/blob/6e1caa390882996bf2d602951b54e4bb2d9c90dc/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java#L96)).
 The 
[MultipleComponentLeaderElectionService](https://github.com/apache/flink/blob/0290715a57b8d243586ab747b0cd2416c8081012/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultMultipleComponentLeaderElectionService.java#L166)
 does call revoke on the LeaderContender instances, though.
   
   We should align this behavior and specify it in the LeaderElectionService 
contract before going ahead with refactoring the interfaces 
([FLIP-285](https://cwiki.apache.org/confluence/display/FLINK/FLIP-285%3A+Refactoring+LeaderElection+to+make+Flink+support+multi-component+leader+election+out-of-the-box)).
   
   ## Brief change log
   
   * Updated the JavaDoc in `LeaderElectionService.stop()` to specify the 
contract
   * Added `LeaderContender.revokeLeadership()` call to implementations that 
missed that call before
   
   ## Verifying this change
   
   The `LeaderContender.revokeLeadership()` call was also added to 
`TestingLeaderElectionService` to make each test rely on this contract.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082600527


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   It's only for unaligned checkpoints. There is a possibility we will have 
more options, but not very likely. So maybe 
`execution.checkpointing.unaligned.max-subtasks-per-channel-state-file`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


flinkbot commented on PR #21741:
URL: https://github.com/apache/flink/pull/21741#issuecomment-1398454630

   
   ## CI report:
   
   * 79d653c444716cf88e0b22b08333c2feb50ac872 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #21732: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues

2023-01-20 Thread GitBox


gaborgsomogyi commented on PR #21732:
URL: https://github.com/apache/flink/pull/21732#issuecomment-1398453852

   Resolved the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


gaborgsomogyi commented on PR #21741:
URL: https://github.com/apache/flink/pull/21741#issuecomment-1398450836

   Addressing the 
[following](https://github.com/apache/flink/pull/21723#issuecomment-1387339515) 
comment in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] healchow commented on a diff in pull request #21722: [FLINK-30747][docs]Translate "Window Deduplication" page of "Querys"

2023-01-20 Thread GitBox


healchow commented on code in PR #21722:
URL: https://github.com/apache/flink/pull/21722#discussion_r1082589298


##
docs/content.zh/docs/dev/table/sql/queries/window-deduplication.md:
##
@@ -101,15 +102,16 @@ Flink SQL> SELECT *
 
+--+---+--+-+--+--++
 ```
 
-*Note: in order to better understand the behavior of windowing, we simplify 
the displaying of timestamp values to not show the trailing zeros, e.g. 
`2020-04-15 08:05` should be displayed as `2020-04-15 08:05:00.000` in Flink 
SQL Client if the type is `TIMESTAMP(3)`.*
+*注意: 为了更好地理解窗口行为,这里把timestamp值后面的0去掉了.例如:在Flink SQL 
Client中,如果类型是`TIMESTAMP(3)`,`2020-04-15 08:05`应该显示成`2020-04-15 08:05:00.000`.*
+
+## 限制
 
-## Limitation
+## 在窗口表值函数后直接进行窗口去重的限制
 
-### Limitation on Window Deduplication which follows after Windowing TVFs 
directly
-Currently, if Window Deduplication follows after [Windowing TVF]({{< ref 
"docs/dev/table/sql/queries/window-tvf" >}}), the [Windowing TVF]({{< ref 
"docs/dev/table/sql/queries/window-tvf" >}}) has to be with Tumble Windows, Hop 
Windows or Cumulate Windows instead of Session windows. Session windows will be 
supported in the near future.
+目前,Flink只支持在滚动,滑动和累计\[窗口表值函数]\({{< ref "docs/dev/table/sql/queries/window-tvf" 
>}})后进行窗口去重.会话窗口不久之后就会支持.
 
-### Limitation on time attribute of order key
-Currently, Window Deduplication requires order key must be [event time 
attribute]({{< ref "docs/dev/table/concepts/time_attributes" >}}#event-time) 
instead of [processing time attribute]({{< ref 
"docs/dev/table/concepts/time_attributes" >}}#processing-time). Ordering by 
processing-time would be supported in the near future.
+### 根据时间属性排序的限制
 
+目前,Flink只支持根据\[事件时间属性]\({{< ref "docs/dev/table/concepts/time\_attributes" 
>}}#event-time)排序.根据处理时间排序不久之后就会支持.

Review Comment:
   使用 \ 转义符会导致链接无法跳转。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi commented on pull request #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


gaborgsomogyi commented on PR #21741:
URL: https://github.com/apache/flink/pull/21741#issuecomment-1398447229

   This is how it looks like:
   https://user-images.githubusercontent.com/18561820/213716824-489f5eca-ae23-4d20-b496-fdd09dc9457d.png;>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaborgsomogyi opened a new pull request, #21741: [FLINK-30764][docs] Include generic delegation token params in the main config page

2023-01-20 Thread GitBox


gaborgsomogyi opened a new pull request, #21741:
URL: https://github.com/apache/flink/pull/21741

   ## What is the purpose of the change
   
   Until now the generic delegation token params are not shown on the main 
config page. In this PR I've added them.
   
   ## Brief change log
   
   Included generic delegation token params in the main config page.
   
   ## Verifying this change
   
   Existing tests + manually checked the generated page.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on pull request #21717: [FLINK-29945][sql-client] Supports to submit SQL to a embedded SQL Ga…

2023-01-20 Thread GitBox


fsk119 commented on PR #21717:
URL: https://github.com/apache/flink/pull/21717#issuecomment-1398397981

   > P.S. Wrong cmd will get CLI blocked forever. 
   
   Thanks for reporting the case. With the commit 0e04c7, the Executor will try 
to close the Operation in the background when users hit ctrl + c.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21740: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

2023-01-20 Thread GitBox


XComp commented on code in PR #21740:
URL: https://github.com/apache/flink/pull/21740#discussion_r1082532829


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java:
##
@@ -89,11 +93,29 @@ public CompletableFuture closeAsync() {
 }
 }
 
+Exception exception = null;
+try {
+leaderElectionService.stop();
+} catch (Exception e) {
+exception = e;
+}
+final Exception leaderElectionServiceShutdownException = exception;
+
 stopDispatcherLeaderProcess();
 
 FutureUtils.forward(previousDispatcherLeaderProcessTerminationFuture, 
terminationFuture);
 
-return terminationFuture;
+return terminationFuture.whenComplete(
+(ignoredResult, terminationThrowable) -> {
+if (terminationThrowable != null) {
+throw new CompletionException(
+ExceptionUtils.firstOrSuppressed(
+terminationThrowable,
+
leaderElectionServiceShutdownException));

Review Comment:
   Fair point. I don't know, why I didn't come up with a simple copy I 
updated the code. This also fixed the compilation error where some imports were 
missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21740: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

2023-01-20 Thread GitBox


XComp commented on code in PR #21740:
URL: https://github.com/apache/flink/pull/21740#discussion_r1082532829


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java:
##
@@ -89,11 +93,29 @@ public CompletableFuture closeAsync() {
 }
 }
 
+Exception exception = null;
+try {
+leaderElectionService.stop();
+} catch (Exception e) {
+exception = e;
+}
+final Exception leaderElectionServiceShutdownException = exception;
+
 stopDispatcherLeaderProcess();
 
 FutureUtils.forward(previousDispatcherLeaderProcessTerminationFuture, 
terminationFuture);
 
-return terminationFuture;
+return terminationFuture.whenComplete(
+(ignoredResult, terminationThrowable) -> {
+if (terminationThrowable != null) {
+throw new CompletionException(
+ExceptionUtils.firstOrSuppressed(
+terminationThrowable,
+
leaderElectionServiceShutdownException));

Review Comment:
   Fair point. I don't know, why I didn't come up with a simple copy I 
updated this and fixed the compilation failure as part of the force push.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


dawidwys commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082525394


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   1. Does it apply to unaligned checkpoints only? If so, could we put it under 
`unaligned` subgroup? `execution.checkpointing.unaligned.(...)`
   2. Do we plan to have more options under `channel-state`? If not, I would 
not introduce this subgroup.
   3. I like @pnowojski idea more, but I'd add `max` prefix: 
`max-subtasks-per-file`
   
   Depending on 2. I'd go with either `channel-state.max-subtasks-per-file` or 
` max-subtasks-per-channel-state-file`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21740: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

2023-01-20 Thread GitBox


zentol commented on code in PR #21740:
URL: https://github.com/apache/flink/pull/21740#discussion_r1082519363


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java:
##
@@ -89,11 +93,29 @@ public CompletableFuture closeAsync() {
 }
 }
 
+Exception exception = null;
+try {
+leaderElectionService.stop();
+} catch (Exception e) {
+exception = e;
+}
+final Exception leaderElectionServiceShutdownException = exception;
+
 stopDispatcherLeaderProcess();
 
 FutureUtils.forward(previousDispatcherLeaderProcessTerminationFuture, 
terminationFuture);
 
-return terminationFuture;
+return terminationFuture.whenComplete(
+(ignoredResult, terminationThrowable) -> {
+if (terminationThrowable != null) {
+throw new CompletionException(
+ExceptionUtils.firstOrSuppressed(
+terminationThrowable,
+
leaderElectionServiceShutdownException));

Review Comment:
   I kinda preferred how this was modeled as a completable future in the 
lifecycle manager.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082508795


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   That's also my thought. The FLIP-285 changes make these things go away, 
anyway. I'm gonna leave the comment for each of these fields to clarify when 
they are non-null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082508795


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   That's also my thought. The FLIP-285 changes make these things go away, 
anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] hehuiyuan commented on pull request #21731: [FLINK-30679][hive]Fix IndexOutOfBoundsException for Hive lookup join when column pushdown to Hive lookup table source

2023-01-20 Thread GitBox


hehuiyuan commented on PR #21731:
URL: https://github.com/apache/flink/pull/21731#issuecomment-1398347485

   
   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21740: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

2023-01-20 Thread GitBox


flinkbot commented on PR #21740:
URL: https://github.com/apache/flink/pull/21740#issuecomment-1398335456

   
   ## CI report:
   
   * 46ab3d550b0d2be1b260f7c200ca6d168c7625ae UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp opened a new pull request, #21740: [FLINK-30759][runtime] Merges DispatcherRunnerLeaderElectionLifecycleManager into DefaultDispatcherRunner

2023-01-20 Thread GitBox


XComp opened a new pull request, #21740:
URL: https://github.com/apache/flink/pull/21740

   ## What is the purpose of the change
   
   The entire purpose of this wrapper 
`DispatcherRunnerLeaderElectionLifecycleManager` is to have the start/stop of 
the `LeaderElectionService` out of the `DefaultDispatcherRunner`. 
FLINK-26522/FLIP-285 will move this logic into the `HighAvailabilityServices`. 
Merging both classes makes the move easier because it aligns it with the other 
`LeaderContender` implementations.
   
   I couldn't find any reason why we need this class. There was a [brief 
discussion in PR 
#9832](https://github.com/apache/flink/pull/9832#discussion_r334866031) for 
FLINK-11843 which introduced this change. But even there, we already discussed 
having a `start()` method, instead.
   
   ## Brief change log
   
   Moved code from `DispatcherRunnerLeaderElectionLifecycleManager` back into 
`DefaultDispatcherRunner`.
   
   ## Verifying this change
   
   * `DefaultDispatcherRunnerTest.testLeaderElectionStarted` was added to check 
the lifecycle explicitly. This test is good to check that the 
`LeaderElectionService` is closed as part of the 
`DefaultDispatcherRunner.close()` call.
   * Various tests within `DefaultDispatcherRunnerTest` would fail without the 
start method being called.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


zentol commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082457711


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   urgh. I'd rather roll these 2 back :see_no_evil: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21738: [FLINK-30761][coordination,test] Replaces JVM assert with Preconditions in leader election code

2023-01-20 Thread GitBox


XComp commented on code in PR #21738:
URL: https://github.com/apache/flink/pull/21738#discussion_r1082412814


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##
@@ -48,7 +49,7 @@ public synchronized CompletableFuture 
getConfirmationFutur
 
 @Override
 public synchronized void start(LeaderContender contender) {
-assert (!getStartFuture().isDone());
+Preconditions.checkState(getStartFuture().isDone());

Review Comment:
   args - good catch. :+1: fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-web] mbalassi closed pull request #602: Blog post on the Delegation Token Framework

2023-01-20 Thread GitBox


mbalassi closed pull request #602: Blog post on the Delegation Token Framework
URL: https://github.com/apache/flink-web/pull/602


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082408148


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   I know, I just hesitated to add this annotation because it would generate 
warnings in several locations within the class. But I added `@Nullable` now for 
the other two fields as well to make it consistent. PTAL - I'm curious whether 
you consider this of too much of a change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082408148


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   I know, I just hesitated to add this annotation because it would generate 
warnings in several locations within the class. But I added `@Nullable` now for 
the other two fields as well to make it consistent. PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082408148


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   I know, I just hesitated to add this annotation because it would generate 
warnings in several locations within the class. But I added `@Nullable` now for 
the other two fields as well to make it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082365656


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
 .booleanType()
 .defaultValue(false)
 .withDescription("Flag to enable approximate local 
recovery.");
+
+public static final ConfigOption 
CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+
ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+.intType()
+.defaultValue(5)
+.withDescription(
+"Defines the maximum number of subtasks that share 
the same channel state file. "
++ "It can reduce the number of small files 
when enable unaligned checkpoint. "
++ "Each subtask will create a new channel 
state file when this is configured to 1.");

Review Comment:
   > number-of-subtasks-share-file
   
   This sounds a bit strange in english. Maybe let's rename it to:
   
   > execution.checkpointing.channel-state.subtasks-per-file
   
   ? and renaming the config option and getters as well? @dawidwys maybe you 
have some better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi merged pull request #21694: [FLINK-30704][filesystems][s3] Add S3 delegation token support

2023-01-20 Thread GitBox


mbalassi merged PR #21694:
URL: https://github.com/apache/flink/pull/21694


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21739: [FLINK-30673][docs][table] Add documentation for "EXPLAIN PLAN_ADVICE" statement

2023-01-20 Thread GitBox


flinkbot commented on PR #21739:
URL: https://github.com/apache/flink/pull/21739#issuecomment-1398198832

   
   ## CI report:
   
   * e19d7dc89412bb6cd9bcf781a8938e9023e23048 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest opened a new pull request, #21739: [FLINK-30673][docs][table] Add documentation for "EXPLAIN PLAN_ADVICE" statement

2023-01-20 Thread GitBox


LadyForest opened a new pull request, #21739:
URL: https://github.com/apache/flink/pull/21739

   ## What is the purpose of the change
   
   This pull request adds documentation for `EXPLAIN PLAN_ADVICE` statement.
   
   
   ## Brief change log
   
   Add doc.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduces a new feature? Yes
 - If yes, how is the feature documented? docs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21738: [FLINK-30761][coordination,test] Replaces JVM assert with Preconditions in leader election code

2023-01-20 Thread GitBox


zentol commented on code in PR #21738:
URL: https://github.com/apache/flink/pull/21738#discussion_r1082324266


##
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java:
##
@@ -48,7 +49,7 @@ public synchronized CompletableFuture 
getConfirmationFutur
 
 @Override
 public synchronized void start(LeaderContender contender) {
-assert (!getStartFuture().isDone());
+Preconditions.checkState(getStartFuture().isDone());

Review Comment:
   ```suggestion
   Preconditions.checkState(!getStartFuture().isDone());
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on a diff in pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


zentol commented on code in PR #21737:
URL: https://github.com/apache/flink/pull/21737#discussion_r1082321263


##
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##
@@ -48,9 +48,11 @@
 private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
 /** The leader contender which applies for leadership. */
+@GuardedBy("lock")
 private volatile LeaderContender leaderContender;

Review Comment:
   this is also nullable; see constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] zentol commented on a diff in pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-20 Thread GitBox


zentol commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-kafka/pull/1#discussion_r1082317243


##
flink-connector-kafka/pom.xml:
##
@@ -18,259 +18,292 @@ specific language governing permissions and limitations
 under the License.
 -->
 http://maven.apache.org/POM/4.0.0;
-xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
-xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
-
-   4.0.0
-
-   
-   flink-connectors
-   org.apache.flink
-   1.16-SNAPSHOT
-   
-
-   flink-connector-kafka

Review Comment:
   can we make the whitespace changes in a separate commit?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21738: [FLINK-30761][coordination,test] Replaces JVM assert with Preconditions in leader election code

2023-01-20 Thread GitBox


flinkbot commented on PR #21738:
URL: https://github.com/apache/flink/pull/21738#issuecomment-1398166208

   
   ## CI report:
   
   * b6fb5cacd48f5c91c7cf27ea45395696b2eb04f8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-20 Thread GitBox


zentol commented on PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1398165894

   > So I think I'll try to reuse this code, migrate it to Flink and update it 
to the latest Cassandra version and push it in this PR.
   
   Sounds good to me so far.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] zentol commented on pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-20 Thread GitBox


zentol commented on PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1398165328

   > I don't think it is an ASF rule but fair enough,
   
   FYI; By and large it should be viewed as a legal requirement. By copying 
code from cassandra you have to adhere to their licensing, which among other 
state that you must have prominent notices for changes to a file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp opened a new pull request, #21738: [FLINK-30761][coordination,test] Replaces JVM assert with Preconditions in leader election code

2023-01-20 Thread GitBox


XComp opened a new pull request, #21738:
URL: https://github.com/apache/flink/pull/21738

   ## What is the purpose of the change
   
   The intention is to make the code more robust to test failures. We do no 
enable asserts in the test runs. The JVM has asserts disabled by default. That 
would mean that invalid state would go unnoticed if we continue to use assert 
instead of Preconditions.
   
   ## Brief change log
   
   * replaced `assert` with Precondition checks
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


flinkbot commented on PR #21737:
URL: https://github.com/apache/flink/pull/21737#issuecomment-1398141726

   
   ## CI report:
   
   * 826f33a2bd51e93a65956d5bb50d64c99eb55f3f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] XComp opened a new pull request, #21737: [FLINK-30760][runtime] Fixes wrongly used @GuardedBy annotation

2023-01-20 Thread GitBox


XComp opened a new pull request, #21737:
URL: https://github.com/apache/flink/pull/21737

   ## What is the purpose of the change
   
   The @GuardedBy annotations were assigned to some public methods which are 
not called under the specified lock. @GuardedBy should be used by methods that 
are only allowed to be called within the context of the lock that is specified 
in the annotation.
   
   This is a preparation task for FLINK-26522/FLIP-285 where introduce new 
internal methods that actually need to be properly annotated by the 
`@GuardedBy` annotation.
   
   ## Brief change log
   
   * removes the annotations in methods that are called outside of the lock
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] pnowojski commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-20 Thread GitBox


pnowojski commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1398086486

    Thanks, it looks good. Once you squash fixup commits I will do the last 
pass and hopefully merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ZhongFuCheng3y commented on pull request #21553: [FLINK-30492][doc] fix incorrect dependency artifactId in hive overview.

2023-01-19 Thread GitBox


ZhongFuCheng3y commented on PR #21553:
URL: https://github.com/apache/flink/pull/21553#issuecomment-1398010881

   > @reswqa @ZhongFuCheng3y Hive requires the 
`flink-table-planner_2.12-.jar`, so for example ` 
flink-table-planner_2.12-1.16.0.jar`. See 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/overview/#moving-the-planner-jar
   > 
   > That means that if you want to run this locally, you should not include 
`flink-table-api-java-bridge_2.12` but you should 
include flink-table-api-scala-bridge_2.12, per 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/#flink-apis
   > 
   > That's what I tried to say by mention that Hive still requires Scala 
(hence it still has the Scala suffix), which implies that you will also need to 
use all other artifacts in the Scala version.
   
   ths!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-01-19 Thread GitBox


flinkbot commented on PR #21736:
URL: https://github.com/apache/flink/pull/21736#issuecomment-1397936174

   
   ## CI report:
   
   * f7edc0bb36dda4c6144c97a73c8167c0e2bf0a0e UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii commented on pull request #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-01-19 Thread GitBox


gaoyunhaii commented on PR #21736:
URL: https://github.com/apache/flink/pull/21736#issuecomment-1397934193

   @XComp  could you have a look at the PR? Very thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii opened a new pull request, #21736: [FLINK-27518][tests] Refactor migration tests to support version update automatically

2023-01-19 Thread GitBox


gaoyunhaii opened a new pull request, #21736:
URL: https://github.com/apache/flink/pull/21736

   ## What is the purpose of the change
   
   This PR refactors the state migration tests so that when cutting branch, we 
need only add new version and could generates the states of stale version 
automatically. 
   
   In general, there are two options:
   1. Similar to the configuration document generator, we could have a module 
that depends on all the modules containing migration tests and run generation 
with this module. 
   2. Introduce tools to generate states, and each module config the tools 
separately. 
   
   We finally choose the option 2. This is because Maven have a bad support for 
depending on the tests classes of other modules, we could only use the 
`test-jar`, which do not support transitive dependency and make it hard to 
manage these transitive dependencies. 
   
   Except for the generating, during the refactoring we also make each 
migration tests use a dynamic version lists: `[start, FlinkVersion.last()]`, 
which free us from manually change the list on cutting branch for each version. 
   
   ## Brief change log
   - Introduce a new framework of migration tests. 
   - Introduce tools to scan the test classes of the configured module and 
generating snapshots.
   - Refactor existing tests based on the new framework. 
   
   ## Verifying this change
   
   Manually verified the process of 
   - Add version 1.18 to `FlinkVersion`.
   - Generating states automatically via `mvn clean package 
-Pgenerate-snapshots -Dgenerate.version=1.17 -nsu -DskipRat -Dcheckstyle.skip 
-Drat.ignoreErrors=true -DspotlessFiles=ineffective -Dfast -DskipTests -pl 
flink-core -am`
   - Run the existing tests and verified the tests including the ones against 
1.17 are all executed successfully. 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **Yes**
 - If yes, how is the feature documented? **docs**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on a diff in pull request #21724: [FLINK-30727][table-planner] Fix JoinReorderITCaseBase.testBushyTreeJoinReorder failed

2023-01-19 Thread GitBox


swuferhong commented on code in PR #21724:
URL: https://github.com/apache/flink/pull/21724#discussion_r1082114927


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/join/JoinReorderITCase.java:
##
@@ -42,6 +45,12 @@ public class JoinReorderITCase extends JoinReorderITCaseBase 
{
 
 private StreamExecutionEnvironment env;
 
+@AfterEach
+public void after() {

Review Comment:
   > This is the root cause? Do you reproduce failed test locally?
   
   This error can not be reproduce in local. I think the root cause is the 
`StreamTestSink.clear();` not be called after tests finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] swuferhong commented on pull request #21724: [FLINK-30727][table-planner] Fix JoinReorderITCaseBase.testBushyTreeJoinReorder failed

2023-01-19 Thread GitBox


swuferhong commented on PR #21724:
URL: https://github.com/apache/flink/pull/21724#issuecomment-1397929362

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #21322: [FLINK-30025][table] Unified the max display column width for SqlClient and Table APi in both Streaming and Batch execMode

2023-01-19 Thread GitBox


JingGe commented on code in PR #21322:
URL: https://github.com/apache/flink/pull/21322#discussion_r1082088415


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -926,7 +926,7 @@ private TableResultInternal 
executeQueryOperation(QueryOperation operation) {
 DataTypeUtils.expandCompositeTypeToSchema(
 
sinkOperation.getConsumedDataType()),
 resultProvider.getRowDataStringConverter(),
-PrintStyle.DEFAULT_MAX_COLUMN_WIDTH,

Review Comment:
   It will be removed while removing deprecated 
sql-client.display.max-column-width. A follow-up ticket has been created: 
https://issues.apache.org/jira/browse/FLINK-30758



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] JingGe commented on a diff in pull request #21322: [FLINK-30025][table] Unified the max display column width for SqlClient and Table APi in both Streaming and Batch execMode

2023-01-19 Thread GitBox


JingGe commented on code in PR #21322:
URL: https://github.com/apache/flink/pull/21322#discussion_r1082086891


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java:
##
@@ -102,6 +102,16 @@ private TableConfigOptions() {}
 + "the session time zone is used during 
conversion. The input of option is either a full name "
 + "such as \"America/Los_Angeles\", or a 
custom timezone id such as \"GMT-08:00\".");
 
+@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
+public static final ConfigOption PRINT_MAX_COLUMN_WIDTH =

Review Comment:
   deprecate sql-client.display.max-column-width



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21727: [FLINK-30752][python] Support 'EXPLAIN PLAN_ADVICE' statement in PyFlink

2023-01-19 Thread GitBox


LadyForest commented on PR #21727:
URL: https://github.com/apache/flink/pull/21727#issuecomment-1397885340

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lincoln-lil closed pull request #21676: [FLINK-30662][table] Planner supports delete

2023-01-19 Thread GitBox


lincoln-lil closed pull request #21676: [FLINK-30662][table] Planner supports 
delete
URL: https://github.com/apache/flink/pull/21676


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fsk119 commented on a diff in pull request #21717: [FLINK-29945][sql-client] Supports to submit SQL to a embedded SQL Ga…

2023-01-19 Thread GitBox


fsk119 commented on code in PR #21717:
URL: https://github.com/apache/flink/pull/21717#discussion_r1082055861


##
flink-table/flink-sql-client/src/test/resources/sql/table.q:
##
@@ -79,7 +79,10 @@ show tables;
 
 # test SHOW CREATE TABLE
 show create table orders;
-CREATE TABLE `default_catalog`.`default_database`.`orders` (
++-+

Review Comment:
   In the FLIP, we do reach an agreement about the presentation. BTW, I think 
it's the first version of the presentation when the input line is too long. 
From my experience, we should support printing the results with multiple lines.
   
   
![image](https://user-images.githubusercontent.com/33114724/213604189-eb68b4fe-f4a1-4b92-b429-e2edaa289657.png)
   
   In PG/Presto, the client uses '+' to indicate the current line is part of 
the last line. Considering the current change is so large, I think we can move 
the improvements to the future.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] jiangxin369 commented on pull request #201: [FLINK-27716] Add Python API docs in ML

2023-01-19 Thread GitBox


jiangxin369 commented on PR #201:
URL: https://github.com/apache/flink-ml/pull/201#issuecomment-1397846787

   @lindong28 Could you help to review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082041845


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
 private final Thread thread;
 private volatile Exception thrown = null;
 private volatile boolean wasClosed = false;
-private final String taskName;
+
+private final Map> 
unreadyQueues =
+new ConcurrentHashMap<>();
+
+private final JobID jobID;
+private final Set subtasks;
+private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+private final int numberOfSubtasksShareFile;

Review Comment:
   Sorry, after address your next 
[comment](https://github.com/apache/flink/pull/20151#discussion_r1081119337), I 
found we can add the `lock.notifyAll()` during close.
   
   When the executor is closed, the while loop inside of `waitAndTakeUnsafe` 
will be finished, then `waitAndTakeUnsafe` will return `null`, and the 
`executor thread` can be finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -107,49 +119,78 @@ void run() {
 try {
 closeAll(
 this::cleanupRequests,
-() ->
-dispatcher.fail(
-thrown == null ? new 
CancellationException() : thrown));
+() -> {
+Throwable cause;
+synchronized (lock) {
+cause = thrown == null ? new 
CancellationException() : thrown;
+}
+dispatcher.fail(cause);
+});
 } catch (Exception e) {
-//noinspection NonAtomicOperationOnVolatileField
-thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+synchronized (lock) {
+//noinspection NonAtomicOperationOnVolatileField
+thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+}
 }
 FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 }
 LOG.debug("loop terminated");
 }
 
 private void loop() throws Exception {
-while (!wasClosed) {
+while (true) {
 try {
-ChannelStateWriteRequest request = deque.take();
-// The executor will end the registration, when the start 
request comes.
-// Because the checkpoint can be started after all tasks are 
initiated.
-if (request instanceof CheckpointStartRequest && 
isRegistering()) {
-checkState(
-isRegistering.compareAndSet(true, false),
-"Transition isRegistering failed.");
+ChannelStateWriteRequest request;
+boolean completeRegister = false;
+synchronized (lock) {
+if (wasClosed) {
+return;
+}
+request = waitAndTakeUnsafe();
+// The executor will end the registration, when the start 
request comes.
+// Because the checkpoint can be started after all tasks 
are initiated.
+if (request instanceof CheckpointStartRequest) {
+completeRegister = completeRegister();
+}
+}
+if (completeRegister) {
 onRegistered.accept(this);
 }
 dispatcher.dispatch(request);
 } catch (InterruptedException e) {
-if (!wasClosed) {
-LOG.debug(
-"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
-e);
-} else {
-Thread.currentThread().interrupt();
+synchronized (lock) {
+if (!wasClosed) {
+LOG.debug(
+"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
+e);
+} else {
+Thread.currentThread().interrupt();
+}
 }
 }
 }
 }
 
+private ChannelStateWriteRequest waitAndTakeUnsafe() throws 
InterruptedException {
+ChannelStateWriteRequest request;
+while (true) {

Review Comment:
   Thanks for your review, updated.
   
   I didn't squash commits, and add a new fixup commit, it should be easy to 
review. And I can squash them and rebase master after you think it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -107,49 +119,78 @@ void run() {
 try {
 closeAll(
 this::cleanupRequests,
-() ->
-dispatcher.fail(
-thrown == null ? new 
CancellationException() : thrown));
+() -> {
+Throwable cause;
+synchronized (lock) {
+cause = thrown == null ? new 
CancellationException() : thrown;
+}
+dispatcher.fail(cause);
+});
 } catch (Exception e) {
-//noinspection NonAtomicOperationOnVolatileField
-thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+synchronized (lock) {
+//noinspection NonAtomicOperationOnVolatileField
+thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+}
 }
 FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 }
 LOG.debug("loop terminated");
 }
 
 private void loop() throws Exception {
-while (!wasClosed) {
+while (true) {
 try {
-ChannelStateWriteRequest request = deque.take();
-// The executor will end the registration, when the start 
request comes.
-// Because the checkpoint can be started after all tasks are 
initiated.
-if (request instanceof CheckpointStartRequest && 
isRegistering()) {
-checkState(
-isRegistering.compareAndSet(true, false),
-"Transition isRegistering failed.");
+ChannelStateWriteRequest request;
+boolean completeRegister = false;
+synchronized (lock) {
+if (wasClosed) {
+return;
+}
+request = waitAndTakeUnsafe();
+// The executor will end the registration, when the start 
request comes.
+// Because the checkpoint can be started after all tasks 
are initiated.
+if (request instanceof CheckpointStartRequest) {
+completeRegister = completeRegister();
+}
+}
+if (completeRegister) {
 onRegistered.accept(this);
 }
 dispatcher.dispatch(request);
 } catch (InterruptedException e) {
-if (!wasClosed) {
-LOG.debug(
-"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
-e);
-} else {
-Thread.currentThread().interrupt();
+synchronized (lock) {
+if (!wasClosed) {
+LOG.debug(
+"Channel state executor is interrupted while 
waiting for a request (continue waiting)",
+e);
+} else {
+Thread.currentThread().interrupt();
+}
 }
 }
 }
 }
 
+private ChannelStateWriteRequest waitAndTakeUnsafe() throws 
InterruptedException {
+ChannelStateWriteRequest request;
+while (true) {

Review Comment:
   Thanks for your review, updated.
   
   I didn't squash commits, and add a new fixup commit, it should be easy to 
review. And I can squash them after you think it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-kafka] mas-chen commented on pull request #1: [FLINK-30052][Connectors/Kafka] Move existing Kafka connector code from Flink repo to dedicated Kafka repo

2023-01-19 Thread GitBox


mas-chen commented on PR #1:
URL: 
https://github.com/apache/flink-connector-kafka/pull/1#issuecomment-1397828588

   CI now passes on my fork


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082041845


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
 private final Thread thread;
 private volatile Exception thrown = null;
 private volatile boolean wasClosed = false;
-private final String taskName;
+
+private final Map> 
unreadyQueues =
+new ConcurrentHashMap<>();
+
+private final JobID jobID;
+private final Set subtasks;
+private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+private final int numberOfSubtasksShareFile;

Review Comment:
   Sorry, after address your next 
[comment](https://github.com/apache/flink/pull/20151#discussion_r1081119337), I 
found we can add the `lock.notifyAll()` here.
   
   When the executor is closed, the while loop inside of `waitAndTakeUnsafe` 
will be finished, then `waitAndTakeUnsafe` will return `null`, and the 
`executor thread` can be finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21735: [hotfix] [docs] Fix a typo in a query that erroneously tries to perform an aggregation on a string column

2023-01-19 Thread GitBox


flinkbot commented on PR #21735:
URL: https://github.com/apache/flink/pull/21735#issuecomment-1397810857

   
   ## CI report:
   
   * 37175760368c64b665e16e5cf8b7eb4f4e951fee UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] karanasher opened a new pull request, #21735: [hotfix] [docs] Fix a typo in a query that erroneously tries to perform an aggregation on a string column

2023-01-19 Thread GitBox


karanasher opened a new pull request, #21735:
URL: https://github.com/apache/flink/pull/21735

   Fix a typo in a query that erroneously tries to perform an aggregation on a 
string column.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

2023-01-19 Thread GitBox


1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082023420


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements 
ChannelStateWriteRequestEx
 private final Thread thread;
 private volatile Exception thrown = null;
 private volatile boolean wasClosed = false;
-private final String taskName;
+
+private final Map> 
unreadyQueues =
+new ConcurrentHashMap<>();
+
+private final JobID jobID;
+private final Set subtasks;
+private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+private final int numberOfSubtasksShareFile;

Review Comment:
   This is code about `lock.wait()`, if we add the `lock.notifyAll()`, the 
`lock.wait()` will return directly. And the `request` is still null, 
`lock.wait` will be blocked again. So the `lock.notifyAll()` doesn't work. 
That's why I think the `thread.interrupt()` is enough here.
   
   ```
   private ChannelStateWriteRequest waitAndTakeUnsafe() throws 
InterruptedException {
   ChannelStateWriteRequest request;
   while (true) {
   request = deque.pollFirst();
   if (request == null) {
   lock.wait();
   } else {
   return request;
   }
   }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi closed pull request #463: [FLINK-30119] Breaking change: Flink Kubernetes Operator should store…

2023-01-19 Thread GitBox


morhidi closed pull request #463: [FLINK-30119] Breaking change: Flink 
Kubernetes Operator should store…
URL: https://github.com/apache/flink-kubernetes-operator/pull/463


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #463: [FLINK-30119] Breaking change: Flink Kubernetes Operator should store…

2023-01-19 Thread GitBox


morhidi commented on PR #463:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/463#issuecomment-1397797194

   After discussing it with Clara, I'm colsing this for the time being.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dchristle commented on pull request #21729: [FLINK-30751] [docs] Remove references to disableDataSync in RocksDB documentation

2023-01-19 Thread GitBox


dchristle commented on PR #21729:
URL: https://github.com/apache/flink/pull/21729#issuecomment-1397381115

   Hi @Myasuka,
   
   Thanks for reviewing this PR! I split the changes into separate commits. The 
first commit removes `disableDataSync`, and the last commit aligns the PyFlink 
RocksDB documentation with its Java equivalent, as you requested. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dchristle commented on a diff in pull request #21729: [FLINK-30751] [docs] Remove references to disableDataSync in RocksDB documentation

2023-01-19 Thread GitBox


dchristle commented on code in PR #21729:
URL: https://github.com/apache/flink/pull/21729#discussion_r1081619201


##
flink-python/pyflink/datastream/state_backend.py:
##
@@ -956,16 +956,20 @@ class PredefinedOptions(Enum):
 determined to be beneficial for performance under different settings.
 
 Some of these settings are based on experiments by the Flink community, 
some follow
-guides from the RocksDB project.
+guides from the RocksDB project. Some configurations are enabled 
unconditionally (e.g.
+setUseFsync(false), which disables syncing to storage) so they do not 
appear here. See the
+documentation for the createBaseCommonDBOptions() and 
createBaseCommonColumnOptions() methods

Review Comment:
   That makes sense. I originally made this change to make PyFlink 
documentation match closely to the same description from the Java 
documentation, but mentioning the Java classes & non-public methods probably 
adds confusion. I made this change in the most recent commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on pull request #19970: [FLINK-27970][tests][JUnit5 migration] flink-hadoop-bulk

2023-01-19 Thread GitBox


RyanSkraba commented on PR #19970:
URL: https://github.com/apache/flink/pull/19970#issuecomment-1397364945

   Hello!  I've rebased this PR to master and fixed the merge conflicts in the 
meantime.
   
   I will be mostly away from my computer for a couple of weeks, but I'll check 
in if anything changes!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on pull request #21289: [FLINK-29452] Allow unit tests to be executed independently

2023-01-19 Thread GitBox


RyanSkraba commented on PR #21289:
URL: https://github.com/apache/flink/pull/21289#issuecomment-1397363704

   Hello!  I've rebased this PR to master.  I don't believe there are any 
remaining requested changes that I haven't addressed (by comment or other).
   
   I will be mostly away from my computer for a couple of weeks, but I'll check 
in if anything changes!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dchristle commented on a diff in pull request #21729: [FLINK-30751] [docs] Remove references to disableDataSync in RocksDB documentation

2023-01-19 Thread GitBox


dchristle commented on code in PR #21729:
URL: https://github.com/apache/flink/pull/21729#discussion_r1081578012


##
flink-python/pyflink/datastream/state_backend.py:
##
@@ -1000,21 +1000,24 @@ class PredefinedOptions(Enum):
 
 The following options are set:
 
+- BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
+- BlockBasedTableConfig.setBlockSize(128 KBytes)
+- BlockBasedTableConfig.setFilterPolicy(BloomFilter(
+`BLOOM_FILTER_BITS_PER_KEY`,
+`BLOOM_FILTER_BLOCK_BASED_MODE`)
 - setLevelCompactionDynamicLevelBytes(true)
-- setTargetFileSizeBase(256 MBytes)
+- setMaxBackgroundJobs(4)
 - setMaxBytesForLevelBase(1 GByte)
-- setWriteBufferSize(64 MBytes)
-- setIncreaseParallelism(4)
-- setMinWriteBufferNumberToMerge(3)
-- setMaxWriteBufferNumber(4)
-- setUseFsync(false)
 - setMaxOpenFiles(-1)
-- BlockBasedTableConfig.setBlockCacheSize(256 MBytes)
-- BlockBasedTableConfigsetBlockSize(128 KBytes)
+- setMaxWriteBufferNumber(4)

Review Comment:
   Originally, I wanted to remove just the erroneous `disableDataSync`, but in 
doing this, I noticed other errors like the missing Bloom filter or that the 
Python documentation implies `setUseFSync(false)` is set while the Java docs 
don't (not technically a mistake, but it's confusing & not easy to see from the 
code). 
   
   Sorting isn't strictly necessary, but it _was_ part of how I found the Bloom 
filter setting was undocumented & manually verified each group of options now 
matches its description. It lets readers quickly determine whether a particular 
setting they have in mind is changed or not. Since it is easier to see at a 
glance when a new config is added, or an old one removed, future maintainers 
have a lower probability of introducing bugs or making the documentation & code 
diverge like it did for the Bloom filter.
   
   It isn't strictly necessary to sort in this PR, either, but the overhead of 
creating a separate PR/separate JIRA issue seems too high for such a simple 
change. 
   
   I can add these other non-idealities that I fixed into the JIRA issue 
description. Would that be an OK path forward, rather than deferring these 
minor polishes into separate PRs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on pull request #19897: [FLINK-27885][tests][JUnit5 migration] flink-csv

2023-01-19 Thread GitBox


RyanSkraba commented on PR #19897:
URL: https://github.com/apache/flink/pull/19897#issuecomment-1397317096

   Hello!  I've rebased this PR to master.  I don't believe there are any 
remaining requested changes that I haven't addressed (by comment or other).
   
   I will be mostly away from my computer for a couple of weeks, but I'll check 
in if anything changes!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] mbalassi commented on pull request #21732: [FLINK-30754][tests] Fix ExceptionThrowingDelegationTokenProvider/Receiver multi-threaded test issues

2023-01-19 Thread GitBox


mbalassi commented on PR #21732:
URL: https://github.com/apache/flink/pull/21732#issuecomment-1397310760

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-19 Thread GitBox


dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1081546445


##
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/MapDataSerializer.java:
##
@@ -227,25 +225,22 @@ public void readSnapshot(int readVersion, DataInputView 
in, ClassLoader userCode
 
 @Override
 public TypeSerializer restoreSerializer() {
-return new MapDataSerializer(
-previousKeyType,
-previousValueType,
-previousKeySerializer,
-previousValueSerializer);
+return new MapDataSerializer(keyType, valueType, keySerializer, 
valueSerializer);
 }
 
 @Override
 public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(
-TypeSerializer newSerializer) {
-if (!(newSerializer instanceof MapDataSerializer)) {
+TypeSerializerSnapshot oldSerializerSnapshot) {
+if (!(oldSerializerSnapshot instanceof BaseMapSerializerSnapshot)) 
{

Review Comment:
   Why has the type changed in the check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-19 Thread GitBox


dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1081544400


##
flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/RowDataSerializer.java:
##
@@ -194,31 +193,33 @@ public void readSnapshot(int readVersion, DataInputView 
in, ClassLoader userCode
 @Override
 public RowDataSerializer restoreSerializer() {
 return new RowDataSerializer(
-previousTypes,
-
nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
+types, 
nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
 }
 
 @Override
 public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(
-TypeSerializer newSerializer) {
-if (!(newSerializer instanceof RowDataSerializer)) {
+TypeSerializerSnapshot oldSerializerSnapshot) {
+if (!(oldSerializerSnapshot instanceof RowDataSerializerSnapshot)) 
{
 return TypeSerializerSchemaCompatibility.incompatible();
 }
 
-RowDataSerializer newRowSerializer = (RowDataSerializer) 
newSerializer;
-if (!Arrays.equals(previousTypes, newRowSerializer.fieldTypes)) {
+RowDataSerializerSnapshot oldRowDataSerializerSnapshot =
+(RowDataSerializerSnapshot) oldSerializerSnapshot;
+if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) {
 return TypeSerializerSchemaCompatibility.incompatible();
 }
 
 
CompositeTypeSerializerUtil.IntermediateCompatibilityResult
 intermediateResult =
 
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
-newRowSerializer.fieldSerializers,
+
oldRowDataSerializerSnapshot.nestedSerializersSnapshotDelegate
+.getNestedSerializerSnapshots(),
 nestedSerializersSnapshotDelegate
 .getNestedSerializerSnapshots());
 
 if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
-RowDataSerializer reconfiguredCompositeSerializer = 
restoreSerializer();
+org.apache.flink.table.runtime.typeutils.RowDataSerializer

Review Comment:
   Is that correct? This looks wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-19 Thread GitBox


dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1081541284


##
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala:
##
@@ -80,25 +80,26 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends 
TypeSerializerSnapsh
 new EnumValueSerializer(enumObject)
   }
 
-  override def resolveSchemaCompatibility(
-  newSerializer: TypeSerializer[E#Value]): 
TypeSerializerSchemaCompatibility[E#Value] = {
+  override def resolveSchemaCompatibility(oldSerializerSnapshot: 
TypeSerializerSnapshot[E#Value])
+  : TypeSerializerSchemaCompatibility[E#Value] = {
 
 Preconditions.checkState(enumClass != null)
-Preconditions.checkState(previousEnumConstants != null)
+Preconditions.checkState(enumConstants != null)
 
-if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) {
+if (!oldSerializerSnapshot.isInstanceOf[ScalaEnumSerializerSnapshot[E]]) {
   return TypeSerializerSchemaCompatibility.incompatible()
 }
 
-val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]]
-if (!enumClass.equals(newEnumSerializer.enum.getClass)) {
+val oldEnumSerializerSnapshot =
+  oldSerializerSnapshot.asInstanceOf[ScalaEnumSerializerSnapshot[E]]
+if (!enumClass.equals(oldEnumSerializerSnapshot.enumClass)) {
   return TypeSerializerSchemaCompatibility.incompatible()
 }
 
-for ((previousEnumName, index) <- previousEnumConstants) {
+for ((enumName, index) <- oldEnumSerializerSnapshot.enumConstants) {
   try {
-val newEnumName = newEnumSerializer.enum(index).toString
-if (previousEnumName != newEnumName) {
+val oldEnumName = enumConstants(index)._1

Review Comment:
   ```suggestion
   val enumName = enumConstants(index)._1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-19 Thread GitBox


dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1081540937


##
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala:
##
@@ -80,25 +80,26 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends 
TypeSerializerSnapsh
 new EnumValueSerializer(enumObject)
   }
 
-  override def resolveSchemaCompatibility(
-  newSerializer: TypeSerializer[E#Value]): 
TypeSerializerSchemaCompatibility[E#Value] = {
+  override def resolveSchemaCompatibility(oldSerializerSnapshot: 
TypeSerializerSnapshot[E#Value])
+  : TypeSerializerSchemaCompatibility[E#Value] = {
 
 Preconditions.checkState(enumClass != null)
-Preconditions.checkState(previousEnumConstants != null)
+Preconditions.checkState(enumConstants != null)
 
-if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) {
+if (!oldSerializerSnapshot.isInstanceOf[ScalaEnumSerializerSnapshot[E]]) {
   return TypeSerializerSchemaCompatibility.incompatible()
 }
 
-val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]]
-if (!enumClass.equals(newEnumSerializer.enum.getClass)) {
+val oldEnumSerializerSnapshot =
+  oldSerializerSnapshot.asInstanceOf[ScalaEnumSerializerSnapshot[E]]
+if (!enumClass.equals(oldEnumSerializerSnapshot.enumClass)) {
   return TypeSerializerSchemaCompatibility.incompatible()
 }
 
-for ((previousEnumName, index) <- previousEnumConstants) {
+for ((enumName, index) <- oldEnumSerializerSnapshot.enumConstants) {

Review Comment:
   ```suggestion
   for ((oldEnumName, index) <- oldEnumSerializerSnapshot.enumConstants) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] dawidwys commented on a diff in pull request #21635: [FLINK-30613] Improve resolving schema compatibility -- Milestone one

2023-01-19 Thread GitBox


dawidwys commented on code in PR #21635:
URL: https://github.com/apache/flink/pull/21635#discussion_r1081536771


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/RowDataSerializer.java:
##
@@ -328,26 +327,27 @@ public void readSnapshot(int readVersion, DataInputView 
in, ClassLoader userCode
 @Override
 public RowDataSerializer restoreSerializer() {
 return new RowDataSerializer(
-previousTypes,
-
nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
+types, 
nestedSerializersSnapshotDelegate.getRestoredNestedSerializers());
 }
 
 @Override
 public TypeSerializerSchemaCompatibility 
resolveSchemaCompatibility(
-TypeSerializer newSerializer) {
-if (!(newSerializer instanceof RowDataSerializer)) {
+TypeSerializerSnapshot oldSerializerSnapshot) {
+if (!(oldSerializerSnapshot instanceof RowDataSerializerSnapshot)) 
{
 return TypeSerializerSchemaCompatibility.incompatible();
 }
 
-RowDataSerializer newRowSerializer = (RowDataSerializer) 
newSerializer;
-if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
+RowDataSerializerSnapshot oldRowDataSerializerSnapshot =
+(RowDataSerializerSnapshot) oldSerializerSnapshot;
+if (!Arrays.equals(types, oldRowDataSerializerSnapshot.types)) {
 return TypeSerializerSchemaCompatibility.incompatible();
 }
 
 
CompositeTypeSerializerUtil.IntermediateCompatibilityResult
 intermediateResult =
 
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(
-newRowSerializer.fieldSerializers,
+
oldRowDataSerializerSnapshot.nestedSerializersSnapshotDelegate

Review Comment:
   shouldn't the order of serializers be reversed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] boring-cyborg[bot] commented on pull request #36: [FLINK-30378][docs] Use modified sql_connector_download_table shortcode

2023-01-19 Thread GitBox


boring-cyborg[bot] commented on PR #36:
URL: 
https://github.com/apache/flink-connector-aws/pull/36#issuecomment-1397233856

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] zentol merged pull request #36: [FLINK-30378][docs] Use modified sql_connector_download_table shortcode

2023-01-19 Thread GitBox


zentol merged PR #36:
URL: https://github.com/apache/flink-connector-aws/pull/36


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol merged pull request #21492: [FLINK-30378][docs] Add v2 sql_connector_download_table shortcode

2023-01-19 Thread GitBox


zentol merged PR #21492:
URL: https://github.com/apache/flink/pull/21492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zentol commented on pull request #17834: [FLINK-24941][datadog] Support Boolean gauges

2023-01-19 Thread GitBox


zentol commented on PR #17834:
URL: https://github.com/apache/flink/pull/17834#issuecomment-1397207313

   So gauges are a bit annoying. Ideally we would only have Number gauges 
because really nothing else is properly supported by a majority of systems.
   
   Boolean gauges are usually a mistake from the get-go as well.
   Lets take the `isBackpressured` metric. This metric tells you whether the 
task is back-pressured right now at this very moment.
   That's a terrible metric to make any decision, and you should rather use 
`backPressuredTimeMsPerSecond` because it's not susceptible to bad luck. 
`isBackpressured` is only accurate if you are either 100% or 0% back-pressured; 
for everything in-between it's quite inaccurate (especially since the sampling 
interval is the reporting interval, aka typically in the order of seconds) 
(edited) 
   
   That's why the PR didn't receive any attention. In a way it'd only enable 
users to rely on bad metrics. Sure, consistency across reporters isn't a bad 
argument, but this consistency should still provide some real value.
   
   Mapping booleans to ints isn't necessarily sound as well, because 
aggregating them isn't obvious.
   If we really wanted to supported gauges we'd ideally map them to a 
distribution I guess.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on a diff in pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-19 Thread GitBox


echauchot commented on code in PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#discussion_r1081463284


##
flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java:
##
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.cassandra.source.split;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/** Serializer for {@link CassandraSplit}. */
+public class CassandraSplitSerializer implements 
SimpleVersionedSerializer {
+
+public static final CassandraSplitSerializer INSTANCE = new 
CassandraSplitSerializer();
+
+public static final int CURRENT_VERSION = 0;
+
+private CassandraSplitSerializer() {}
+
+@Override
+public int getVersion() {
+return CURRENT_VERSION;
+}
+
+@Override
+public byte[] serialize(CassandraSplit cassandraSplit) throws IOException {
+try (ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+ObjectOutputStream objectOutputStream =
+new ObjectOutputStream(byteArrayOutputStream)) {
+objectOutputStream.writeObject(cassandraSplit);

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21703: [FLINK-29880][hive] Introduce auto compaction for Hive sink in batch mode

2023-01-19 Thread GitBox


luoyuxia commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1081459459


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+@RegisterExtension
+private static final MiniClusterExtension MINI_CLUSTER = new 
MiniClusterExtension();
+
+private TableEnvironment tableEnv;
+private HiveCatalog hiveCatalog;
+private String warehouse;
+
+@BeforeEach
+public void setUp() {
+hiveCatalog = HiveTestUtils.createHiveCatalog();
+hiveCatalog.open();
+warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+tableEnv.useCatalog(hiveCatalog.getName());
+}
+
+@AfterEach
+public void tearDown() {
+if (hiveCatalog != null) {
+hiveCatalog.close();
+}
+}
+
+@Test
+public void testNoCompaction() throws Exception {

Review Comment:
   In deed, I'm intended to add case to cover `ALL_EXCHANGES_BLOCKING` for 
it'll increase the test time. We always try to reduce the test time as hive 
moudle has cost much time.
   Also, from the side of these file compaction pipeline, the shuffle mode 
makes no difference. And if we cover the two cases, what about the other 
shuffle modes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21703: [FLINK-29880][hive] Introduce auto compaction for Hive sink in batch mode

2023-01-19 Thread GitBox


luoyuxia commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1081459459


##
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableCompactSinkITCase.java:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT case for Hive table compaction in batch mode. */
+public class HiveTableCompactSinkITCase {
+
+@RegisterExtension
+private static final MiniClusterExtension MINI_CLUSTER = new 
MiniClusterExtension();
+
+private TableEnvironment tableEnv;
+private HiveCatalog hiveCatalog;
+private String warehouse;
+
+@BeforeEach
+public void setUp() {
+hiveCatalog = HiveTestUtils.createHiveCatalog();
+hiveCatalog.open();
+warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
+tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+tableEnv.useCatalog(hiveCatalog.getName());
+}
+
+@AfterEach
+public void tearDown() {
+if (hiveCatalog != null) {
+hiveCatalog.close();
+}
+}
+
+@Test
+public void testNoCompaction() throws Exception {

Review Comment:
   In deed, I'm intended to add case to cover `ALL_EXCHANGES_BLOCKING` for 
it'll increase the test time and from the side of these file compaction 
pipeline, the shuffle mode makes no difference.
   And if we cover the two cases, what about the other shuffle modes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] healchow commented on a diff in pull request #21726: [FLINK-30748][docs]Translate "Overview" page of "Querys" into Chinese

2023-01-19 Thread GitBox


healchow commented on code in PR #21726:
URL: https://github.com/apache/flink/pull/21726#discussion_r1081454537


##
docs/content.zh/docs/dev/table/sql/queries/overview.md:
##
@@ -155,110 +158,116 @@ sink_descriptor = 
TableDescriptor.for_connector("filesystem")
 
 t_env.create_temporary_table("RubberOrders", sink_descriptor)
 
-# run an INSERT SQL on the Table and emit the result to the TableSink
+# 在表上执行一个 INSERT SQL ,SQL结果写入到TableSink
 table_env \
 .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders 
WHERE product LIKE '%Rubber%'")
 ```
+
 {{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
 
-## Execute a Query
+## 执行查询
 
+通过`TableEnvironment.executeSql()`方法可以执行`SELECT`或`VALUES`语句,并把结果收集到本地.它将`SELECT`语句(或`VALUES`语句)的结果作为`TableResult`返回.和`SELECT`语句相似,`Table.execute()`方法可以执行`Table`对象,并把结果收集到本地客户端.
+`TableResult.collect()`方法返回一个可关闭的行迭代器(row 
iterator).除非所有结果数据都被收集完成了,否则`SELECT`作业不会停止,所以应该积极使用`CloseableIterator#close()`方法关闭作业,以防止资源泄露.
 
`TableResult.print()`可以打印`SELECT`的结果到客户端的控制台中.`TableResult`上的结果数据只能被访问一次.因此`collect()`和`print()`只能二选一.
 
-A SELECT statement or a VALUES statement can be executed to collect the 
content to local through the `TableEnvironment.executeSql()` method. The method 
returns the result of the SELECT statement (or the VALUES statement) as a 
`TableResult`. Similar to a SELECT statement, a `Table` object can be executed 
using the `Table.execute()` method to collect the content of the query to the 
local client.
-`TableResult.collect()` method returns a closeable row iterator. The select 
job will not be finished unless all result data has been collected. We should 
actively close the job to avoid resource leak through the 
`CloseableIterator#close()` method. 
-We can also print the select result to client console through the 
`TableResult.print()` method. The result data in `TableResult` can be accessed 
only once. Thus, `collect()` and `print()` must not be called after each other.
+`TableResult.collect()` 和 
`TableResult.print()`在不同的checkpointing设置下有一些差异.(流式作业开启checkpointing, 参见 
\[checkpointing 设置]\({{< ref "docs/deployment/config" >}}#checkpointing)).
 
-`TableResult.collect()` and `TableResult.print()` have slightly different 
behaviors under different checkpointing settings (to enable checkpointing for a 
streaming job, see [checkpointing config]({{< ref "docs/deployment/config" 
>}}#checkpointing)).
-* For batch jobs or streaming jobs without checkpointing, 
`TableResult.collect()` and `TableResult.print()` have neither exactly-once nor 
at-least-once guarantee. Query results are immediately accessible by the 
clients once they're produced, but exceptions will be thrown when the job fails 
and restarts.
-* For streaming jobs with exactly-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end exactly-once record delivery. 
A result will be accessible by clients only after its corresponding checkpoint 
completes.
-* For streaming jobs with at-least-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end at-least-once record 
delivery. Query results are immediately accessible by the clients once they're 
produced, but it is possible for the same result to be delivered multiple times.
+*   对于没有checkpointing的批式或流式作业, `TableResult.collect()` 和 `TableResult.print()` 
既不保证精确一次(exactly-once)也不保证至少一次(at-least-once) 
.查询结果一旦产生,客户端可以立即访问,但是,作业失败或重启将抛出异常.

Review Comment:
   建议使用中文标点,并且在中文与英文之间添加一个空格,以提高阅读体验。比如:
   ```
   *   对于没有 checkpointing 的批式或流式作业,`TableResult.collect()` 和 
`TableResult.print()` 
既不保证精确一次(exactly-once)也不保证至少一次(at-least-once)。查询结果一旦产生,客户端可以立即访问,但是作业失败或重启时将会抛出异常。
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] healchow commented on a diff in pull request #21726: [FLINK-30748][docs]Translate "Overview" page of "Querys" into Chinese

2023-01-19 Thread GitBox


healchow commented on code in PR #21726:
URL: https://github.com/apache/flink/pull/21726#discussion_r1081454537


##
docs/content.zh/docs/dev/table/sql/queries/overview.md:
##
@@ -155,110 +158,116 @@ sink_descriptor = 
TableDescriptor.for_connector("filesystem")
 
 t_env.create_temporary_table("RubberOrders", sink_descriptor)
 
-# run an INSERT SQL on the Table and emit the result to the TableSink
+# 在表上执行一个 INSERT SQL ,SQL结果写入到TableSink
 table_env \
 .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders 
WHERE product LIKE '%Rubber%'")
 ```
+
 {{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
 
-## Execute a Query
+## 执行查询
 
+通过`TableEnvironment.executeSql()`方法可以执行`SELECT`或`VALUES`语句,并把结果收集到本地.它将`SELECT`语句(或`VALUES`语句)的结果作为`TableResult`返回.和`SELECT`语句相似,`Table.execute()`方法可以执行`Table`对象,并把结果收集到本地客户端.
+`TableResult.collect()`方法返回一个可关闭的行迭代器(row 
iterator).除非所有结果数据都被收集完成了,否则`SELECT`作业不会停止,所以应该积极使用`CloseableIterator#close()`方法关闭作业,以防止资源泄露.
 
`TableResult.print()`可以打印`SELECT`的结果到客户端的控制台中.`TableResult`上的结果数据只能被访问一次.因此`collect()`和`print()`只能二选一.
 
-A SELECT statement or a VALUES statement can be executed to collect the 
content to local through the `TableEnvironment.executeSql()` method. The method 
returns the result of the SELECT statement (or the VALUES statement) as a 
`TableResult`. Similar to a SELECT statement, a `Table` object can be executed 
using the `Table.execute()` method to collect the content of the query to the 
local client.
-`TableResult.collect()` method returns a closeable row iterator. The select 
job will not be finished unless all result data has been collected. We should 
actively close the job to avoid resource leak through the 
`CloseableIterator#close()` method. 
-We can also print the select result to client console through the 
`TableResult.print()` method. The result data in `TableResult` can be accessed 
only once. Thus, `collect()` and `print()` must not be called after each other.
+`TableResult.collect()` 和 
`TableResult.print()`在不同的checkpointing设置下有一些差异.(流式作业开启checkpointing, 参见 
\[checkpointing 设置]\({{< ref "docs/deployment/config" >}}#checkpointing)).
 
-`TableResult.collect()` and `TableResult.print()` have slightly different 
behaviors under different checkpointing settings (to enable checkpointing for a 
streaming job, see [checkpointing config]({{< ref "docs/deployment/config" 
>}}#checkpointing)).
-* For batch jobs or streaming jobs without checkpointing, 
`TableResult.collect()` and `TableResult.print()` have neither exactly-once nor 
at-least-once guarantee. Query results are immediately accessible by the 
clients once they're produced, but exceptions will be thrown when the job fails 
and restarts.
-* For streaming jobs with exactly-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end exactly-once record delivery. 
A result will be accessible by clients only after its corresponding checkpoint 
completes.
-* For streaming jobs with at-least-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end at-least-once record 
delivery. Query results are immediately accessible by the clients once they're 
produced, but it is possible for the same result to be delivered multiple times.
+*   对于没有checkpointing的批式或流式作业, `TableResult.collect()` 和 `TableResult.print()` 
既不保证精确一次(exactly-once)也不保证至少一次(at-least-once) 
.查询结果一旦产生,客户端可以立即访问,但是,作业失败或重启将抛出异常.

Review Comment:
   建议使用中文标点,并且在中文与英文之间添加一个空格,以提高阅读体验。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] healchow commented on a diff in pull request #21726: [FLINK-30748][docs]Translate "Overview" page of "Querys" into Chinese

2023-01-19 Thread GitBox


healchow commented on code in PR #21726:
URL: https://github.com/apache/flink/pull/21726#discussion_r1081453260


##
docs/content.zh/docs/dev/table/sql/queries/overview.md:
##
@@ -155,110 +158,116 @@ sink_descriptor = 
TableDescriptor.for_connector("filesystem")
 
 t_env.create_temporary_table("RubberOrders", sink_descriptor)
 
-# run an INSERT SQL on the Table and emit the result to the TableSink
+# 在表上执行一个 INSERT SQL ,SQL结果写入到TableSink
 table_env \
 .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders 
WHERE product LIKE '%Rubber%'")
 ```
+
 {{< /tab >}}
 {{< /tabs >}}
 
 {{< top >}}
 
-## Execute a Query
+## 执行查询
 
+通过`TableEnvironment.executeSql()`方法可以执行`SELECT`或`VALUES`语句,并把结果收集到本地.它将`SELECT`语句(或`VALUES`语句)的结果作为`TableResult`返回.和`SELECT`语句相似,`Table.execute()`方法可以执行`Table`对象,并把结果收集到本地客户端.
+`TableResult.collect()`方法返回一个可关闭的行迭代器(row 
iterator).除非所有结果数据都被收集完成了,否则`SELECT`作业不会停止,所以应该积极使用`CloseableIterator#close()`方法关闭作业,以防止资源泄露.
 
`TableResult.print()`可以打印`SELECT`的结果到客户端的控制台中.`TableResult`上的结果数据只能被访问一次.因此`collect()`和`print()`只能二选一.
 
-A SELECT statement or a VALUES statement can be executed to collect the 
content to local through the `TableEnvironment.executeSql()` method. The method 
returns the result of the SELECT statement (or the VALUES statement) as a 
`TableResult`. Similar to a SELECT statement, a `Table` object can be executed 
using the `Table.execute()` method to collect the content of the query to the 
local client.
-`TableResult.collect()` method returns a closeable row iterator. The select 
job will not be finished unless all result data has been collected. We should 
actively close the job to avoid resource leak through the 
`CloseableIterator#close()` method. 
-We can also print the select result to client console through the 
`TableResult.print()` method. The result data in `TableResult` can be accessed 
only once. Thus, `collect()` and `print()` must not be called after each other.
+`TableResult.collect()` 和 
`TableResult.print()`在不同的checkpointing设置下有一些差异.(流式作业开启checkpointing, 参见 
\[checkpointing 设置]\({{< ref "docs/deployment/config" >}}#checkpointing)).
 
-`TableResult.collect()` and `TableResult.print()` have slightly different 
behaviors under different checkpointing settings (to enable checkpointing for a 
streaming job, see [checkpointing config]({{< ref "docs/deployment/config" 
>}}#checkpointing)).
-* For batch jobs or streaming jobs without checkpointing, 
`TableResult.collect()` and `TableResult.print()` have neither exactly-once nor 
at-least-once guarantee. Query results are immediately accessible by the 
clients once they're produced, but exceptions will be thrown when the job fails 
and restarts.
-* For streaming jobs with exactly-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end exactly-once record delivery. 
A result will be accessible by clients only after its corresponding checkpoint 
completes.
-* For streaming jobs with at-least-once checkpointing, `TableResult.collect()` 
and `TableResult.print()` guarantee an end-to-end at-least-once record 
delivery. Query results are immediately accessible by the clients once they're 
produced, but it is possible for the same result to be delivered multiple times.
+*   对于没有checkpointing的批式或流式作业, `TableResult.collect()` 和 `TableResult.print()` 
既不保证精确一次(exactly-once)也不保证至少一次(at-least-once) 
.查询结果一旦产生,客户端可以立即访问,但是,作业失败或重启将抛出异常.
+*   对于checkpointing设置为精确一次(exactly-once)的流式作业, `TableResult.collect()` 和 
`TableResult.print()` 保证端到端(end-to-end)的数据只传递一次.相应的checkpoint完成后,客户端才能访问结果.
+*   对于checkpointing设置为至少一次(at-least-once)的流式作业, `TableResult.collect()` 和 
`TableResult.print()` 保证端到端(end-to-end)的数据至少传递一次. 
查询结果一旦产生,客户端可以立即访问,但是可能会有同一条数据出现多次的情况.
 
 {{< tabs "88a003e1-16ea-43cc-9d42-d43ef1351e53" >}}
 {{< tab "Java" >}}
+
 ```java
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
 
 tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, 
amount INT) WITH (...)");
 
-// execute SELECT statement
+// 执行`SELECT`语句
 TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
-// use try-with-resources statement to make sure the iterator will be closed 
automatically
+// 使用 try-with-resources 确保iterator会自动关闭
 try (CloseableIterator it = tableResult1.collect()) {
 while(it.hasNext()) {
 Row row = it.next();
-// handle row
+// 处理数据
 }
 }
 
-// execute Table
+// 执行表
 TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
 tableResult2.print();
 
 ```
+
 {{< /tab >}}
 {{< tab "Scala" >}}
+
 ```scala
 val env = StreamExecutionEnvironment.getExecutionEnvironment()
 val tableEnv = StreamTableEnvironment.create(env, settings)
-// enable checkpointing
+// 启用 checkpointing
 tableEnv.getConfig
   .set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, 

[GitHub] [flink] zentol merged pull request #21723: [FLINK-30749][security][runtime] Fix delegation token provider enabled flag documentation

2023-01-19 Thread GitBox


zentol merged PR #21723:
URL: https://github.com/apache/flink/pull/21723


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-cassandra] echauchot commented on pull request #3: [FLINK-26822] Add Cassandra Source

2023-01-19 Thread GitBox


echauchot commented on PR #3:
URL: 
https://github.com/apache/flink-connector-cassandra/pull/3#issuecomment-1397142509

   > > I did not author the RingRange and SplitsGenerator classes. I got them 
from the Apache Beam Cassandra connector.
   > 
   > If you're copying stuff from other projects it please add some 
notice/attribution to the files and update the Flink source notice accordingly.
   
   ah yes I forgot this Flink guideline. I don't think it is an ASF rule but 
fair enough, I'll add it to the javadoc and notice.
   
   > 
   > > Back in 2017 I coded a [splitter for Cassandra Beam 
connector](https://github.com/echauchot/beam/blob/bfa33b85b6b310556ffa5c44c99bef50575b2c56/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java#L346)
 that works [with 
tokens](https://github.com/echauchot/beam/blob/BEAM-245-CassandraIO/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/DataSizeEstimates.java)
 also but that is simpler and supports all the Cassandra partitionners. Would 
you prefer that we use this other approach ?
   > 
   > Not sure? Why didn't it make it into Beam? Do you know why the Beam code 
is written the way it is?
   
   Actually, another splitting approach was opted in by the reviewer in 2017. 
But short after there was another author who changed the splitting to something 
similar to my 2017 token based code. So when I thought about coding the split 
for Flink connector I decided to take the version of the code that was merged 
to Beam master. But it is true that it is over complicated, redundant and not 
supporting the non-default Cassandra partitioner. The approach I had in 2017 
was the same as the Cassandra Spark connector written by datastax (tokens + 
cassandra size estimates statistics). So I think I'll try to reuse this code, 
migrate it to Flink and update it to the latest Cassandra version and push it 
in this PR. 
   WDYT ?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #21717: [FLINK-29945][sql-client] Supports to submit SQL to a embedded SQL Ga…

2023-01-19 Thread GitBox


LadyForest commented on PR #21717:
URL: https://github.com/apache/flink/pull/21717#issuecomment-1397141360

   P.S. Wrong cmd will get CLI blocked forever. The stacktrace is attached
   
   ```text
   Flink SQL> desc extended foo;
   
   
   Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Could not read from command line.
   at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:221)
   at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:186)
   at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:132)
   at 
org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:157)
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:102)
   at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:193)
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:167)
   Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Failed to get response.
   at 
org.apache.flink.table.client.gateway.ExecutorImpl.getResponse(ExecutorImpl.java:325)
   at 
org.apache.flink.table.client.gateway.ExecutorImpl.getSessionConfig(ExecutorImpl.java:184)
   at 
org.apache.flink.table.client.cli.CliClient.printExecutionException(CliClient.java:408)
   at 
org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:272)
   at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:206)
   ... 6 more
   Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.rest.handler.RestHandlerException: Failed to 
getSessionConfig.
   at 
org.apache.flink.table.gateway.rest.handler.session.GetSessionConfigHandler.handleRequest(GetSessionConfigHandler.java:66)
   at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
   at 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
   at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
   at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
   at java.util.Optional.ifPresent(Optional.java:159)
   at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
   at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
   at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
   at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
   at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
   at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
   at 

[GitHub] [flink] LadyForest commented on a diff in pull request #21717: [FLINK-29945][sql-client] Supports to submit SQL to a embedded SQL Ga…

2023-01-19 Thread GitBox


LadyForest commented on code in PR #21717:
URL: https://github.com/apache/flink/pull/21717#discussion_r1081032613


##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImpl.java:
##
@@ -18,35 +18,136 @@
 
 package org.apache.flink.table.client.cli.parser;
 
-import org.apache.flink.table.api.SqlParserException;
-import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager;
+import org.apache.flink.sql.parser.impl.SimpleCharStream;
+import org.apache.flink.sql.parser.impl.Token;
+import org.apache.flink.table.api.SqlParserEOFException;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.operations.Operation;
 
+import java.io.StringReader;
+import java.util.Iterator;
 import java.util.Optional;
 
-/** SqlCommandParserImpl wrappers an {@link Executor} supports parse a 
statement to an Operation. */
-public class SqlCommandParserImpl implements SqlCommandParser {
-private final Executor executor;
+import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.EOF;
+import static 
org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.IDENTIFIER;
+import static 
org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.SEMICOLON;
 
-public SqlCommandParserImpl(Executor executor) {
-this.executor = executor;
-}
+/**
+ * The {@link SqlCommandParserImpl} uses {@link 
FlinkSqlParserImplTokenManager} to do lexical
+ * analysis. It cannot recognize special hive keywords yet because Hive has a 
slightly different
+ * vocabulary compared to Flink's, which causes the The {@link 
SqlCommandParserImpl}

Review Comment:
   Nit: remove duplicated "The"



##
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImpl.java:
##
@@ -18,35 +18,136 @@
 
 package org.apache.flink.table.client.cli.parser;
 
-import org.apache.flink.table.api.SqlParserException;
-import org.apache.flink.table.client.gateway.Executor;
+import org.apache.flink.sql.parser.impl.FlinkSqlParserImplTokenManager;
+import org.apache.flink.sql.parser.impl.SimpleCharStream;
+import org.apache.flink.sql.parser.impl.Token;
+import org.apache.flink.table.api.SqlParserEOFException;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
-import org.apache.flink.table.operations.Operation;
 
+import java.io.StringReader;
+import java.util.Iterator;
 import java.util.Optional;
 
-/** SqlCommandParserImpl wrappers an {@link Executor} supports parse a 
statement to an Operation. */
-public class SqlCommandParserImpl implements SqlCommandParser {
-private final Executor executor;
+import static org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.EOF;
+import static 
org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.IDENTIFIER;
+import static 
org.apache.flink.sql.parser.impl.FlinkSqlParserImplConstants.SEMICOLON;
 
-public SqlCommandParserImpl(Executor executor) {
-this.executor = executor;
-}
+/**
+ * The {@link SqlCommandParserImpl} uses {@link 
FlinkSqlParserImplTokenManager} to do lexical
+ * analysis. It cannot recognize special hive keywords yet because Hive has a 
slightly different
+ * vocabulary compared to Flink's, which causes the The {@link 
SqlCommandParserImpl}
+ * misunderstanding some Hive's keywords to IDENTIFIER. But the ClientParser 
is only responsible to
+ * check whether the statement is completed or not and only cares about a few 
statements. So it's
+ * acceptable to tolerate the inaccuracy here.
+ */
+public class SqlCommandParserImpl implements SqlCommandParser {
 
-@Override
-public Optional parseCommand(String stmt) throws 
SqlParserException {
+public Optional parseStatement(String statement) throws 
SqlExecutionException {
 // normalize
-stmt = stmt.trim();
+statement = statement.trim();
 // meet empty statement, e.g ";\n"
-if (stmt.isEmpty() || stmt.equals(";")) {
+if (statement.isEmpty() || statement.equals(";")) {
 return Optional.empty();
+} else {
+return Optional.of(getCommand(new 
TokenIterator(statement.trim(;

Review Comment:
   Nit: no need to `trim` again?



##
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/parser/SqlCommandParserImplTest.java:
##
@@ -28,105 +28,99 @@
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
-import static 
org.apache.flink.table.client.cli.parser.StatementType.BEGIN_STATEMENT_SET;
-import static org.apache.flink.table.client.cli.parser.StatementType.CLEAR;
-import static org.apache.flink.table.client.cli.parser.StatementType.END;
-import static org.apache.flink.table.client.cli.parser.StatementType.EXPLAIN;
-import 

[GitHub] [flink] luoyuxia commented on a diff in pull request #21703: [FLINK-29880][hive] Introduce auto compaction for Hive sink in batch mode

2023-01-19 Thread GitBox


luoyuxia commented on code in PR #21703:
URL: https://github.com/apache/flink/pull/21703#discussion_r1081412503


##
docs/content/docs/connectors/table/hive/hive_read_write.md:
##
@@ -558,6 +558,70 @@ use more threads to speed the gathering.
 **NOTE:**
 - Only `BATCH` mode supports to auto gather statistic, `STREAMING` mode 
doesn't support it yet.
 
+### File Compaction
+
+The Hive sink also supports file compactions, which allows applications to 
reduce the number of files generated while writing into Hive.
+
+ Stream Mode
+
+In stream mode, the behavior is same to `FileSystem` sink. Please refer to 
[File Compaction]({{< ref "docs/connectors/table/filesystem" 
>}}#file-compaction) for more details.
+
+ Batch Mode
+
+When it's in batch mode and auto compaction is enabled, after finishing 
writing files, Flink will calculate the average size of written files for each 
partition. And if the average size is less than the
+threshold configured, Flink will then try to compact these files to files with 
a target size. The following is the table's options for file compactions.

Review Comment:
   I accpet it except that I still think we should use `a target size` instead 
of `the target size`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #21601: [FLINK-29720][hive] Supports native avg function for hive dialect

2023-01-19 Thread GitBox


luoyuxia commented on code in PR #21601:
URL: https://github.com/apache/flink/pull/21601#discussion_r1081360275


##
docs/content.zh/docs/connectors/table/hive/hive_functions.md:
##
@@ -73,6 +73,31 @@ Some Hive built-in functions in older versions have [thread 
safety issues](https
 We recommend users patch their own Hive to fix them.
 {{< /hint >}}
 
+## Use Native Hive Aggregate Functions via HiveModule
+
+For Hive's built-in aggregation function, Flink currently uses sort-based 
aggregation strategy. Compared to hash-based aggregation strategy, the 
performance is one to two times worse, so from Flink 1.17, we have implemented 
some of Hive's aggregation functions natively in Flink.
+These functions will use the hash-agg strategy to improve performance. 
Currently, only five functions are supported, namely sum/count/avg/min/max, and 
more aggregation functions will be supported in the future.
+Users can use the native aggregation function by turning on the option 
`table.exec.hive.native-agg-function.enabled`, which brings significant 
performance improvement to the job.

Review Comment:
   I think we also should remind user that when 
`table.exec.hive.native-agg-function.enabled` = `true`, there'll be some 
incompatibility issue, e.g, some dataypes may not be supported in native 
implementation.



##
docs/content.zh/docs/connectors/table/hive/hive_functions.md:
##
@@ -73,6 +73,31 @@ Some Hive built-in functions in older versions have [thread 
safety issues](https
 We recommend users patch their own Hive to fix them.
 {{< /hint >}}
 
+## Use Native Hive Aggregate Functions via HiveModule

Review Comment:
   May be we can add a link for `HiveModule` like 
   [HiveModule]({{< ref "docs/dev/table/modules" >}}#hivemodule)
   



##
docs/content.zh/docs/connectors/table/hive/hive_functions.md:
##
@@ -73,6 +73,31 @@ Some Hive built-in functions in older versions have [thread 
safety issues](https
 We recommend users patch their own Hive to fix them.
 {{< /hint >}}
 
+## Use Native Hive Aggregate Functions via HiveModule
+
+For Hive's built-in aggregation function, Flink currently uses sort-based 
aggregation strategy. Compared to hash-based aggregation strategy, the 
performance is one to two times worse, so from Flink 1.17, we have implemented 
some of Hive's aggregation functions natively in Flink.
+These functions will use the hash-agg strategy to improve performance. 
Currently, only five functions are supported, namely sum/count/avg/min/max, and 
more aggregation functions will be supported in the future.
+Users can use the native aggregation function by turning on the option 
`table.exec.hive.native-agg-function.enabled`, which brings significant 
performance improvement to the job.
+
+
+  
+
+Key
+Default
+Type
+Description
+
+  
+  
+
+table.exec.hive.native-agg-function.enabled
+false
+Boolean
+Enabling native aggregate function for hive dialect to use 
hash-agg strategy that can improve the aggregation performance. This is a 
job-level option, user can enable it per-job.

Review Comment:
   Maynot precise. Not only for hive dialect, but also for  HiveModule is 
loaded.
   ```suggestion
   Enabling to use native aggregate function to use hash-agg 
strategy which can improve the aggregation performance after loading 
HiveModule. This is a job-level option, user can enable it per-job.
   ```



##
docs/content.zh/docs/connectors/table/hive/hive_functions.md:
##
@@ -73,6 +73,31 @@ Some Hive built-in functions in older versions have [thread 
safety issues](https
 We recommend users patch their own Hive to fix them.
 {{< /hint >}}
 
+## Use Native Hive Aggregate Functions via HiveModule

Review Comment:
   If HiveModule is loaded with a higher priority than CoreMoudle, Flink will 
try to use the Hive built-in function first. And then for Hive built-in 
aggregation function, Flink will use sort-based aggregation strategy.
   
   So, I think the title `Use Native Hive Aggregate Functions via HiveModule` 
is not correct.
   May be can be `Use Native Hive Aggregate Functions`. And add some 
explaination about it.
   May be we can put the sentence 
   `
   If HiveModule is loaded with a higher priority than CoreMoudle, Flink will 
try to use the Hive built-in function first. And then for Hive built-in 
aggregation functio
   ` in here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   4   5   6   7   8   9   10   >