[jira] [Created] (FLINK-29749) flink info command support dynamic properties

2022-10-24 Thread jackylau (Jira)
jackylau created FLINK-29749:


 Summary: flink info command support dynamic properties
 Key: FLINK-29749
 URL: https://issues.apache.org/jira/browse/FLINK-29749
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.17.0
Reporter: jackylau
 Fix For: 1.17.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #21144: [FLINK-29747] refactor: module-based app to standalone components

2022-10-24 Thread GitBox


flinkbot commented on PR #21144:
URL: https://github.com/apache/flink/pull/21144#issuecomment-1290004035

   
   ## CI report:
   
   * aa898fd31f50a69d8bad471df434fb02ab0f28fa UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29711) Topic notification not present in metadata after 60000 ms.

2022-10-24 Thread Durgesh Mishra (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623561#comment-17623561
 ] 

Durgesh Mishra commented on FLINK-29711:


[~martijnvisser]  and [~mason6345]  Can you guys help us here to fix this 
problem ?

> Topic notification not present in metadata after 6 ms.
> --
>
> Key: FLINK-29711
> URL: https://issues.apache.org/jira/browse/FLINK-29711
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.4, 1.14.6
>Reporter: Durgesh Mishra
>Priority: Major
>
> Failed to send data to Kafka null with 
> FlinkKafkaInternalProducer\{transactionalId='null', inTransaction=false, 
> closed=false}
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
> at 
> org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Unknown Source)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29747) [UI] Refactor runtime web from module-based to standalone components

2022-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29747:
---
Labels: pull-request-available  (was: )

> [UI] Refactor runtime web from module-based to standalone components
> 
>
> Key: FLINK-29747
> URL: https://issues.apache.org/jira/browse/FLINK-29747
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Reporter: Junhan Yang
>Assignee: Junhan Yang
>Priority: Major
>  Labels: pull-request-available
>
> From v14 onwards, Angular provides a capability of standalone components that 
> can be independently bootstrapping. This is a powerful feature in terms of 
> refactoring the application to be less-heavy and structurally clean. It also 
> enables the component-level lazy loading in routes, improving the web 
> performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yangjunhan opened a new pull request, #21144: [FLINK-29747] refactor: module-based app to standalone components

2022-10-24 Thread GitBox


yangjunhan opened a new pull request, #21144:
URL: https://github.com/apache/flink/pull/21144

   
   
   ## What is the purpose of the change
   
   FLINK-29747
   
   
   ## Brief change log
   
   Refactor module-based app to standalone components with lazy loading.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-table-store] JingsongLi commented on a diff in pull request #328: [FLINK-29714] Merge TableWrite and TableCompact into one interface

2022-10-24 Thread GitBox


JingsongLi commented on code in PR #328:
URL: https://github.com/apache/flink-table-store/pull/328#discussion_r1004006577


##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##
@@ -137,6 +137,11 @@ public void write(KeyValue kv) throws Exception {
 }
 }
 
+@Override
+public void compact() throws Exception {

Review Comment:
   Can we just use `fullCompact`?



##
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java:
##
@@ -252,9 +257,9 @@ private void updateCompactResult(CompactResult result) {
 compactAfter.addAll(result.after());
 }
 
-private void submitCompaction() throws Exception {
-trySyncLatestCompaction(false);
-compactManager.triggerCompaction();
+private void submitCompaction(boolean forcedCompaction) throws Exception {

Review Comment:
   `forcedCompaction` -> `forcedFullCompaction`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on pull request #20910: [FLINK-29435]SecurityConfiguration supports dynamic configuration

2022-10-24 Thread GitBox


xintongsong commented on PR #20910:
URL: https://github.com/apache/flink/pull/20910#issuecomment-1289993309

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] xintongsong commented on a diff in pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-24 Thread GitBox


xintongsong commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1004004779


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherImpl.java:
##
@@ -148,102 +156,106 @@ private void fetchMetrics() {
 },
 executor);
 
-CompletableFuture> 
queryServiceAddressesFuture =
-
leaderGateway.requestMetricQueryServiceAddresses(timeout);
-
-queryServiceAddressesFuture.whenCompleteAsync(
-(Collection queryServiceAddresses, Throwable 
throwable) -> {
+List> waitingMetricsFutures = new 
ArrayList<>();
+CompletableFuture jmMetricsFuture = 
queryJmMetricsFuture(leaderGateway);
+waitingMetricsFutures.add(jmMetricsFuture);
+jmMetricsFuture.whenCompleteAsync(
+(ignore, throwable) -> {
 if (throwable != null) {
-LOG.debug("Requesting paths for query services 
failed.", throwable);
-} else {
-for (String queryServiceAddress : 
queryServiceAddresses) {
-
retrieveAndQueryMetrics(queryServiceAddress);
-}
+LOG.debug("Failed to fetch the leader's 
metrics.", throwable);
 }
-},
-executor);
-
-// TODO: Once the old code has been ditched, remove the 
explicit TaskManager query
-// service discovery
-// TODO: and return it as part of 
requestMetricQueryServiceAddresses. Moreover,
-// change the MetricStore such that
-// TODO: we don't have to explicitly retain the valid 
TaskManagers, e.g. letting it
-// be a cache with expiry time
-CompletableFuture>>
-taskManagerQueryServiceGatewaysFuture =
-
leaderGateway.requestTaskManagerMetricQueryServiceAddresses(
-timeout);
-
-taskManagerQueryServiceGatewaysFuture.whenCompleteAsync(
-(Collection> 
queryServiceGateways,
-Throwable throwable) -> {
+});
+CompletableFuture tmMetricsFuture = 
queryTmMetricsFuture(leaderGateway);
+waitingMetricsFutures.add(tmMetricsFuture);
+tmMetricsFuture.whenCompleteAsync(
+(ignore, throwable) -> {
 if (throwable != null) {
-LOG.debug(
-"Requesting TaskManager's path for 
query services failed.",
-throwable);
-} else {
-List taskManagersToRetain =
-queryServiceGateways.stream()
-.map(
-(Tuple2 tuple) -> {
-
queryServiceRetriever
-
.retrieveService(tuple.f1)
-
.thenAcceptAsync(
-
this::queryMetrics,
-
executor);
-return 
tuple.f0.getResourceIdString();
-})
-.collect(Collectors.toList());
-
-
metrics.retainTaskManagers(taskManagersToRetain);
+LOG.debug("Failed to fetch the TaskManager's 
metrics.", throwable);

Review Comment:
   ```suggestion
   LOG.debug("Failed to fetch the TaskManager's 
metrics.", throwable);
   return FutureUtils.completedExceptionally(e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-29609) Clean up jobmanager deployment on suspend after recording savepoint info

2022-10-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora reassigned FLINK-29609:
--

Assignee: Sriram Ganesh

> Clean up jobmanager deployment on suspend after recording savepoint info
> 
>
> Key: FLINK-29609
> URL: https://issues.apache.org/jira/browse/FLINK-29609
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Assignee: Sriram Ganesh
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> Currently in case of suspending with savepoint. The jobmanager pod will 
> linger there forever after cancelling the job.
> This is currently used to ensure consistency in case the 
> operator/cancel-with-savepoint operation fails.
> Once we are sure however that the savepoint has been recorded and the job is 
> shut down, we should clean up all the resources. Optionally we can make this 
> configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29609) Clean up jobmanager deployment on suspend after recording savepoint info

2022-10-24 Thread Sriram Ganesh (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623554#comment-17623554
 ] 

Sriram Ganesh commented on FLINK-29609:
---

[~gyfora] - Please assign this to me.

> Clean up jobmanager deployment on suspend after recording savepoint info
> 
>
> Key: FLINK-29609
> URL: https://issues.apache.org/jira/browse/FLINK-29609
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
> Fix For: kubernetes-operator-1.3.0
>
>
> Currently in case of suspending with savepoint. The jobmanager pod will 
> linger there forever after cancelling the job.
> This is currently used to ensure consistency in case the 
> operator/cancel-with-savepoint operation fails.
> Once we are sure however that the savepoint has been recorded and the job is 
> shut down, we should clean up all the resources. Optionally we can make this 
> configurable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-24 Thread GitBox


TanYuxin-tyx commented on PR #21132:
URL: https://github.com/apache/flink/pull/21132#issuecomment-1289958870

   @xintongsong @reswqa Thanks for reviewing the change. I have addressed the 
comments. Could you take a look again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21143: [hotfix] Fix some typo in HiveOptions.

2022-10-24 Thread GitBox


flinkbot commented on PR #21143:
URL: https://github.com/apache/flink/pull/21143#issuecomment-1289954801

   
   ## CI report:
   
   * a1c7b649082670669fdb9af042171c744e48e8b8 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-24 Thread GitBox


TanYuxin-tyx commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003979797


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##
@@ -263,7 +220,151 @@ public void testShortUpdateInterval() throws 
InterruptedException {
 
 fetcher.update();
 
-assertThat(requestMetricQueryServiceGatewaysCounter.get(), is(2));
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(2);
+}
+
+@Test
+public void testIgnoreUpdateRequestWhenFetchingMetrics() throws 
InterruptedException {
+final long updateInterval = 1000L;
+final long waitTimeBeforeReturnMetricResults = updateInterval * 2;
+final Time timeout = Time.seconds(10L);
+final AtomicInteger requestMetricQueryServiceGatewaysCounter = new 
AtomicInteger(0);
+final JobID jobID = new JobID();
+final ResourceID tmRID = ResourceID.generate();
+
+// Create metric fetcher
+MetricFetcher fetcher =
+createMetricFetcherWithServiceGateways(
+jobID,
+tmRID,
+timeout,
+updateInterval,
+waitTimeBeforeReturnMetricResults,
+requestMetricQueryServiceGatewaysCounter);
+
+fetcher.update();
+
+final long start = System.currentTimeMillis();
+long difference = 0L;
+
+while (difference <= updateInterval) {
+Thread.sleep((int) (updateInterval * 1.5f));
+difference = System.currentTimeMillis() - start;
+}
+
+fetcher.update();
+
+
assertThat(requestMetricQueryServiceGatewaysCounter.get()).isEqualTo(1);
+}
+
+@Nonnull
+private MetricFetcher createMetricFetcherWithServiceGateways(
+JobID jobID,
+ResourceID tmRID,
+Time timeout,
+long updateInterval,
+long waitTimeBeforeReturnMetricResults,
+@Nullable AtomicInteger requestMetricQueryServiceGatewaysCounter) {
+final ExecutorService executor = 
java.util.concurrent.Executors.newSingleThreadExecutor();
+// = setup QueryServices
+// 

+
+final MetricQueryServiceGateway jmQueryService =
+new TestingMetricQueryServiceGateway.Builder()
+.setQueryMetricsSupplier(
+() ->
+CompletableFuture.completedFuture(
+new MetricDumpSerialization
+
.MetricSerializationResult(
+new byte[0],
+new byte[0],
+new byte[0],
+new byte[0],
+0,
+0,
+0,
+0)))
+.build();
+
+MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer 
=
+createRequestDumpAnswer(tmRID, jobID);
+final MetricQueryServiceGateway tmQueryService =
+new TestingMetricQueryServiceGateway.Builder()
+.setQueryMetricsSupplier(
+() -> {
+if (waitTimeBeforeReturnMetricResults > 0) 
{
+CompletableFuture<
+MetricDumpSerialization
+
.MetricSerializationResult>
+metricsAnswerFuture = new 
CompletableFuture<>();
+CompletableFuture.completedFuture(null)
+.thenRunAsync(
+waitTimeMs(
+
waitTimeBeforeReturnMetricResults),
+executor)
+.whenCompleteAsync(
+(ignore, throwable) -> 
{
+if (throwable != 
null) {
+
fail(throwable.getMessage());
+}
+  

[GitHub] [flink] reswqa commented on pull request #21143: [hotfix] Fix some typo in HiveOptions.

2022-10-24 Thread GitBox


reswqa commented on PR #21143:
URL: https://github.com/apache/flink/pull/21143#issuecomment-1289950821

   cc @luoyuxia 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa opened a new pull request, #21143: [hotfix] Fix some typo in HiveOptions.

2022-10-24 Thread GitBox


reswqa opened a new pull request, #21143:
URL: https://github.com/apache/flink/pull/21143

   ## What is the purpose of the change
   
   *Fix some typo in HiveOptions.*
   
   
   ## Brief change log
   
 - *Using `withDescription` instead of `withDeprecatedKeys` for some config 
options.*
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #21142: [FLINK-29737][Table] Support DataGen on waveform function.

2022-10-24 Thread GitBox


flinkbot commented on PR #21142:
URL: https://github.com/apache/flink/pull/21142#issuecomment-1289946308

   
   ## CI report:
   
   * 000fb8030dc23459a3669eb62b54a58691641d06 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29737) Support DataGen on waveform function

2022-10-24 Thread chenzihao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623546#comment-17623546
 ] 

chenzihao commented on FLINK-29737:
---

[~fsk119] Thanks for your reply. I have opened the PR, please help to review 
it, thanks a lot.

> Support DataGen on waveform function
> 
>
> Key: FLINK-29737
> URL: https://issues.apache.org/jira/browse/FLINK-29737
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: chenzihao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-24-16-09-47-386.png, 
> image-2022-10-24-16-09-52-410.png
>
>
> In some scenarios, we need to simulate flow changes in the production 
> environment. The current DATAGEN feature only supports data generation at a 
> constant rate. We try to simulate increments of flow using batch jobs, but 
> the production rate is not smooth, so I suggest that we can support sin-based 
> data generation in order to get smooth changes. 
> 1. add another batch job to simulate increments of flow.
> !image-2022-10-24-16-09-52-410.png!
> 2. sin-based.
> !image-2022-10-24-16-09-47-386.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29737) Support DataGen on waveform function

2022-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29737:
---
Labels: pull-request-available  (was: )

> Support DataGen on waveform function
> 
>
> Key: FLINK-29737
> URL: https://issues.apache.org/jira/browse/FLINK-29737
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: chenzihao
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-10-24-16-09-47-386.png, 
> image-2022-10-24-16-09-52-410.png
>
>
> In some scenarios, we need to simulate flow changes in the production 
> environment. The current DATAGEN feature only supports data generation at a 
> constant rate. We try to simulate increments of flow using batch jobs, but 
> the production rate is not smooth, so I suggest that we can support sin-based 
> data generation in order to get smooth changes. 
> 1. add another batch job to simulate increments of flow.
> !image-2022-10-24-16-09-52-410.png!
> 2. sin-based.
> !image-2022-10-24-16-09-47-386.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] chenzihao5 opened a new pull request, #21142: [FLINK-29737][Table] Support DataGen on waveform function.

2022-10-24 Thread GitBox


chenzihao5 opened a new pull request, #21142:
URL: https://github.com/apache/flink/pull/21142

   
   
   ## What is the purpose of the change
   
   Support DataGen on waveform function.
   
   ## Brief change log
   
   - add three parameters:
 - waveform-function-enabled: Apply the waveform function on the rate. The 
default value is false.
 - period: Period of waveform function. The default value is one hour.
 - floating-gap: The gap between the max(min) rate and the standard rate 
which is rows-per-second. The default value is 5000.
   - change the calculation method of taskRowsPerSecond based on sine function.
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no 
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29745) Split reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-29745:
-
Fix Version/s: table-store-0.3.0
   (was: shammon)

> Split reader/writer factory for compaction in MergeTreeTest
> ---
>
> Key: FLINK-29745
> URL: https://issues.apache.org/jira/browse/FLINK-29745
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.2
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29745) Split reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-29745:


Assignee: Shammon

> Split reader/writer factory for compaction in MergeTreeTest
> ---
>
> Key: FLINK-29745
> URL: https://issues.apache.org/jira/browse/FLINK-29745
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.2
>Reporter: Shammon
>Assignee: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: table-store-0.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-29745) Split reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee closed FLINK-29745.

Fix Version/s: shammon
   Resolution: Fixed

master: 537e8cf4b3733d93720381c65b30024870ece533

> Split reader/writer factory for compaction in MergeTreeTest
> ---
>
> Key: FLINK-29745
> URL: https://issues.apache.org/jira/browse/FLINK-29745
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.2
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
> Fix For: shammon
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] JingsongLi merged pull request #331: [FLINK-29745] Use independent reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread GitBox


JingsongLi merged PR #331:
URL: https://github.com/apache/flink-table-store/pull/331


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] cuibo01 commented on pull request #20910: [FLINK-29435]SecurityConfiguration supports dynamic configuration

2022-10-24 Thread GitBox


cuibo01 commented on PR #20910:
URL: https://github.com/apache/flink/pull/20910#issuecomment-1289929840

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29748) Expose the optimize phase in the connector context

2022-10-24 Thread Aitozi (Jira)
Aitozi created FLINK-29748:
--

 Summary: Expose the optimize phase in the connector context
 Key: FLINK-29748
 URL: https://issues.apache.org/jira/browse/FLINK-29748
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner, Table SQL / Runtime
Reporter: Aitozi


Currently, in the connector it can not know whether the whole optimize is 
finished.
When the optimize finished, the all information is static, eg: the reading 
partitions. If I want to validate the final optimized result (like whether the 
reading partition is too much or empty), it needs the context of what is the 
current phase. I think the {{ScanContext}} is ok to expose this information. 




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #160: [FLINK-29434] Add AlgoOperator for Splitter

2022-10-24 Thread GitBox


yunfengzhou-hub commented on code in PR #160:
URL: https://github.com/apache/flink-ml/pull/160#discussion_r1003935113


##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/randomsplitter/RandomSplitterParams.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.randomsplitter;
+
+import org.apache.flink.ml.param.DoubleArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidator;
+import org.apache.flink.ml.param.WithParams;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Params of {@link RandomSplitter}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface RandomSplitterParams extends WithParams {
+Param FRACTIONS =
+new DoubleArrayParam(
+"fractions",

Review Comment:
   Do you think it would be more intuitive to use `weights` instead of 
`fractions`? `fractions` seems to be thresholds of the randomly generated 
doubles in the implementation detail of this algorithm, and I think `weights` 
might be simpler and easier to understand than `fractions`.



##
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/randomsplitter/RandomSplitterParams.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.randomsplitter;
+
+import org.apache.flink.ml.param.DoubleArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidator;
+import org.apache.flink.ml.param.WithParams;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Params of {@link RandomSplitter}.
+ *
+ * @param  The class type of this instance.
+ */
+public interface RandomSplitterParams extends WithParams {
+Param FRACTIONS =
+new DoubleArrayParam(
+"fractions",
+"The fractions of data splitting.",
+new Double[] {0.5},

Review Comment:
   The `weights` parameter of Spark's `RDD.randomSplit()` is a required 
parameter, thus it might be better to make the default value of our `fraction` 
to be `null` to match Spark's behavior.



##
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java:
##
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.randomsplitter.RandomSplitter;
+import 

[GitHub] [flink] reswqa commented on a diff in pull request #21132: [FLINK-29134][metrics] Do not repeatedly add useless metric updating tasks to avoid wasting resources

2022-10-24 Thread GitBox


reswqa commented on code in PR #21132:
URL: https://github.com/apache/flink/pull/21132#discussion_r1003944639


##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##
@@ -43,120 +43,77 @@
 import org.junit.Test;

Review Comment:
   we should use `org.junit.jupiter.api.Test` instead of `org.junit.Test`



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##
@@ -43,120 +43,77 @@
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the MetricFetcher. */
 public class MetricFetcherTest extends TestLogger {
 @Test
 public void testUpdate() {

Review Comment:
   ```suggestion
   void testUpdate() {
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##
@@ -43,120 +43,77 @@
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the MetricFetcher. */
 public class MetricFetcherTest extends TestLogger {

Review Comment:
   ```suggestion
   @ExtendWith(TestLoggerExtension.class)
   class MetricFetcherTest {
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java:
##
@@ -43,120 +43,77 @@
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for the MetricFetcher. */
 public class MetricFetcherTest extends TestLogger {
 @Test
 public void testUpdate() {
 final Time timeout = Time.seconds(10L);
-
-// = setup TaskManager
-// 
=
-
 JobID jobID = new JobID();
 ResourceID tmRID = ResourceID.generate();
 
-// = setup QueryServices
-// 

-
-final MetricQueryServiceGateway jmQueryService =
-new TestingMetricQueryServiceGateway.Builder()
-.setQueryMetricsSupplier(
-() ->
-CompletableFuture.completedFuture(
-new MetricDumpSerialization
-
.MetricSerializationResult(
-new byte[0],
-new byte[0],
-new byte[0],
-new byte[0],
-0,
-0,
-0,
-0)))
-.build();
-
-MetricDumpSerialization.MetricSerializationResult requestMetricsAnswer 
=
-createRequestDumpAnswer(tmRID, jobID);
-final MetricQueryServiceGateway tmQueryService =
-new TestingMetricQueryServiceGateway.Builder()
-.setQueryMetricsSupplier(
-() -> 
CompletableFuture.completedFuture(requestMetricsAnswer))
-.build();
-
-// = setup JobManager
-// 

[GitHub] [flink] flinkbot commented on pull request #21141: [FLINK-29742][sql-gateway] Support completing statement in SqlGatewayService.

2022-10-24 Thread GitBox


flinkbot commented on PR #21141:
URL: https://github.com/apache/flink/pull/21141#issuecomment-1289913250

   
   ## CI report:
   
   * 45299902af8abdcc2da247f590416cf657cb9988 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] yuzelin commented on pull request #21133: [FLINK-29732][sql-gateway] support configuring session with SQL statement.

2022-10-24 Thread GitBox


yuzelin commented on PR #21133:
URL: https://github.com/apache/flink/pull/21133#issuecomment-1289910411

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29742) Support completing statement in SqlGatewayService.

2022-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29742:
---
Labels: pull-request-available  (was: )

> Support completing statement in SqlGatewayService.
> --
>
> Key: FLINK-29742
> URL: https://issues.apache.org/jira/browse/FLINK-29742
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17
>Reporter: yuzelin
>Priority: Major
>  Labels: pull-request-available
>
> Implement SqlGatewayService#completeStatement described in 
> [FLIP-91|https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuzelin opened a new pull request, #21141: [FLINK-29742] Support completing statement in SqlGatewayService.

2022-10-24 Thread GitBox


yuzelin opened a new pull request, #21141:
URL: https://github.com/apache/flink/pull/21141

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on pull request #20830: [FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

2022-10-24 Thread GitBox


luoyuxia commented on PR #20830:
URL: https://github.com/apache/flink/pull/20830#issuecomment-1289904570

   @jiangjiguang Also, could you please help rebase the master?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29747) [UI] Refactor runtime web from module-based to standalone components

2022-10-24 Thread Junhan Yang (Jira)
Junhan Yang created FLINK-29747:
---

 Summary: [UI] Refactor runtime web from module-based to standalone 
components
 Key: FLINK-29747
 URL: https://issues.apache.org/jira/browse/FLINK-29747
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Web Frontend
Reporter: Junhan Yang
Assignee: Junhan Yang


>From v14 onwards, Angular provides a capability of standalone components that 
>can be independently bootstrapping. This is a powerful feature in terms of 
>refactoring the application to be less-heavy and structurally clean. It also 
>enables the component-level lazy loading in routes, improving the web 
>performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia commented on a diff in pull request #20830: [FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

2022-10-24 Thread GitBox


luoyuxia commented on code in PR #20830:
URL: https://github.com/apache/flink/pull/20830#discussion_r1003945031


##
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java:
##
@@ -265,8 +265,16 @@ void readNextGroup() {
 while (valueIndex < this.currentCount) {
 // values are bit packed 8 at a time, so reading 
bitWidth will always work
 ByteBuffer buffer = in.slice(bitWidth);
-this.packer.unpack8Values(
-buffer, buffer.position(), this.currentBuffer, 
valueIndex);
+if (buffer.hasArray()) {
+this.packer.unpack8Values(

Review Comment:
   Could you please add some comments in here to expain why we change it?
   Now, we use a deprecated method, we may need some comments for it so that 
the others can know the context when come to this method for the case that the 
method is deleted by a higher parquet version in some days and flink needs to 
upgrad to the  higher parquet version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #20830: [FLINK-29271]Change to byte array from bytebuffer to improve performance and compatible direct byte buffers

2022-10-24 Thread GitBox


luoyuxia commented on code in PR #20830:
URL: https://github.com/apache/flink/pull/20830#discussion_r1003942184


##
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java:
##
@@ -265,8 +265,16 @@ void readNextGroup() {
 while (valueIndex < this.currentCount) {
 // values are bit packed 8 at a time, so reading 
bitWidth will always work
 ByteBuffer buffer = in.slice(bitWidth);
-this.packer.unpack8Values(
-buffer, buffer.position(), this.currentBuffer, 
valueIndex);
+if (buffer.hasArray()) {
+this.packer.unpack8Values(

Review Comment:
   Thanks for explanation. Make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on pull request #21137: [FLINK-29234][runtime] JobMasterServiceLeadershipRunner handle leader event in a separate executor to avoid dead lock

2022-10-24 Thread GitBox


reswqa commented on PR #21137:
URL: https://github.com/apache/flink/pull/21137#issuecomment-1289898634

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29745) Split reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29745:
---
Labels: pull-request-available  (was: )

> Split reader/writer factory for compaction in MergeTreeTest
> ---
>
> Key: FLINK-29745
> URL: https://issues.apache.org/jira/browse/FLINK-29745
> Project: Flink
>  Issue Type: Bug
>  Components: Table Store
>Affects Versions: table-store-0.2.2
>Reporter: Shammon
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] zjureel opened a new pull request, #331: [FLINK-29745] Use independent reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread GitBox


zjureel opened a new pull request, #331:
URL: https://github.com/apache/flink-table-store/pull/331

   Use independent reader/writer factory for compaction in `MergeTreeTest`, 
otherwise, there may be concurrency problems in the test case


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] czy006 commented on a diff in pull request #21012: [FLINK-29543] Jar Run Rest Handler Support Flink Configuration

2022-10-24 Thread GitBox


czy006 commented on code in PR #21012:
URL: https://github.com/apache/flink/pull/21012#discussion_r1003931113


##
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java:
##
@@ -211,6 +218,37 @@ void testProvideJobId() throws Exception {
 assertThat(jobGraph.get().getJobID()).isEqualTo(jobId);
 }
 
+@Test
+void testProvideFlinkConfig() throws Exception {
+
+REQB jarRequestBodyWithFlinkConfig = 
getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION);
+HandlerRequest request =
+createRequest(
+getJarRequestBodyWithFlinkConfig(FLINK_CONFIGURATION),
+getUnresolvedJarMessageParameters(),
+getUnresolvedJarMessageParameters(),
+jarWithManifest);
+
+handleRequest(request);
+
+Optional jobGraph = getLastSubmittedJobGraphAndReset();
+
+assertThat(jobGraph.isPresent()).isTrue();
+JobGraph graph = jobGraph.get();
+
+assertThat(getExecutionConfig(graph).getParallelism())
+.isNotEqualTo(
+Integer.valueOf(
+
FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM.key(;
+if (jarRequestBodyWithFlinkConfig instanceof JarRunRequestBody) {

Review Comment:
   Do I need to open a new issuse to complete this, or continue to complete 
this function on this issuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-20732) Implement Pulsar Sink on extend Sink API (FLIP-191)

2022-10-24 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-20732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen closed FLINK-20732.
-
Resolution: Fixed

> Implement Pulsar Sink on extend Sink API (FLIP-191)
> ---
>
> Key: FLINK-20732
> URL: https://issues.apache.org/jira/browse/FLINK-20732
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / Pulsar
>Affects Versions: 1.13.0
>Reporter: Jianyun Zhao
>Assignee: Yufan Sheng
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> Flink has deprecated the FLIP-143 and in favour of the new FLIP-191 API for 
> sink implementation. We would implement the Pulsar Sink based on the new 
> TwoPhaseCommittingSink interface. This sink could support both batch and 
> stream procession and follow the semantic of DeliveryGuarantee.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26027) Add FLIP-33 metrics to new PulsarSink

2022-10-24 Thread Zili Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zili Chen updated FLINK-26027:
--
Parent: (was: FLINK-20732)
Issue Type: Improvement  (was: Sub-task)

> Add FLIP-33 metrics to new PulsarSink
> -
>
> Key: FLINK-26027
> URL: https://issues.apache.org/jira/browse/FLINK-26027
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Pulsar
>Reporter: Yufan Sheng
>Assignee: Yufei Zhang
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-table-store] zjureel commented on pull request #326: [FLINK-29702] Add micro benchmarks module and merge tree reader/writer benchmarks

2022-10-24 Thread GitBox


zjureel commented on PR #326:
URL: 
https://github.com/apache/flink-table-store/pull/326#issuecomment-1289880133

   Hi @JingsongLi I have split the reader/writer factory for compaction in the 
micro benchmarks and fix the concurrency problem, please review again, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quant…

2022-10-24 Thread GitBox


zhipeng93 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1003924160


##
flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+public class QuantileSummary {
+
+private static final int BUFFER_SIZE = 1;

Review Comment:
   Hmm.. Is there any technical reasons for choosing `1` instead of 
`5`? If we want to avoid OOM, then why don't you choose `1000`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] libenchao commented on a diff in pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-24 Thread GitBox


libenchao commented on code in PR #20140:
URL: https://github.com/apache/flink/pull/20140#discussion_r1003920728


##
flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml:
##
@@ -51,4 +51,22 @@ TableSourceScan(table=[[default_catalog, default_database, 
jdbc, project=[decima
 ]]>
 
   
+  
+   
+   
+   
+   
+   
+   
+   
+   

[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quant…

2022-10-24 Thread GitBox


zhipeng93 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1003921280


##
flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+public class QuantileSummary {

Review Comment:
   Yes you are right, some of them seems not marked appropriately. Technically 
some of them should be marked as `Internal`. 
   
   Can you mark this class as `Internal` in this PR? We may do others later.



##
flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+public class QuantileSummary {

Review Comment:
   Yes you are right, some of them seem not marked appropriately. Technically 
some of them should be marked as `Internal`. 
   
   Can you mark this class as `Internal` in this PR? We may do others later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #162: [FLINK-29593] Add QuantileSummary to help calculate approximate quant…

2022-10-24 Thread GitBox


zhipeng93 commented on code in PR #162:
URL: https://github.com/apache/flink-ml/pull/162#discussion_r1003921280


##
flink-ml-core/src/main/java/org/apache/flink/ml/util/QuantileSummary.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Helper class to compute an approximate quantile summary. This 
implementation is based on the
+ * algorithm proposed in the paper: "Space-efficient Online Computation of 
Quantile Summaries" by
+ * Greenwald, Michael and Khanna, Sanjeev. 
(https://doi.org/10.1145/375663.375670)
+ */
+public class QuantileSummary {

Review Comment:
   Yeah, some of them seems not marked appropriately. Technically some of them 
should be marked as `Internal`. 
   
   Can you mark this class as `Internal` in this PR? We may do others later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-29737) Support DataGen on waveform function

2022-10-24 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623501#comment-17623501
 ] 

Shengkai Fang commented on FLINK-29737:
---

Thanks for your idea. Could you share the PR link here?

> Support DataGen on waveform function
> 
>
> Key: FLINK-29737
> URL: https://issues.apache.org/jira/browse/FLINK-29737
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: chenzihao
>Priority: Major
> Attachments: image-2022-10-24-16-09-47-386.png, 
> image-2022-10-24-16-09-52-410.png
>
>
> In some scenarios, we need to simulate flow changes in the production 
> environment. The current DATAGEN feature only supports data generation at a 
> constant rate. We try to simulate increments of flow using batch jobs, but 
> the production rate is not smooth, so I suggest that we can support sin-based 
> data generation in order to get smooth changes. 
> 1. add another batch job to simulate increments of flow.
> !image-2022-10-24-16-09-52-410.png!
> 2. sin-based.
> !image-2022-10-24-16-09-47-386.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29217) CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint failed with AssertionFailedError

2022-10-24 Thread Xingbo Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingbo Huang updated FLINK-29217:
-
Fix Version/s: 1.16.1
   (was: 1.16.0)

> CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>  failed with AssertionFailedError
> -
>
> Key: FLINK-29217
> URL: https://issues.apache.org/jira/browse/FLINK-29217
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.16.0
>Reporter: Xingbo Huang
>Assignee: Yunfeng Zhou
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.16.1
>
>
> {code:java}
> 2022-09-07T02:00:50.2507464Z Sep 07 02:00:50 [ERROR] 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint
>   Time elapsed: 2.137 s  <<< FAILURE!
> 2022-09-07T02:00:50.2508673Z Sep 07 02:00:50 
> org.opentest4j.AssertionFailedError: 
> 2022-09-07T02:00:50.2509309Z Sep 07 02:00:50 
> 2022-09-07T02:00:50.2509945Z Sep 07 02:00:50 Expecting value to be false but 
> was true
> 2022-09-07T02:00:50.2511950Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 2022-09-07T02:00:50.2513254Z Sep 07 02:00:50  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 2022-09-07T02:00:50.2514621Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 2022-09-07T02:00:50.2516342Z Sep 07 02:00:50  at 
> org.apache.flink.streaming.runtime.tasks.CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.testConcurrentCheckpoint(CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:173)
> 2022-09-07T02:00:50.2517852Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2022-09-07T02:00:50.251Z Sep 07 02:00:50  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2022-09-07T02:00:50.2520065Z Sep 07 02:00:50  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2022-09-07T02:00:50.2521153Z Sep 07 02:00:50  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2022-09-07T02:00:50.2522747Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 2022-09-07T02:00:50.2523973Z Sep 07 02:00:50  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2022-09-07T02:00:50.2525158Z Sep 07 02:00:50  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 2022-09-07T02:00:50.2526347Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2022-09-07T02:00:50.2527525Z Sep 07 02:00:50  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2022-09-07T02:00:50.2528646Z Sep 07 02:00:50  at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> 2022-09-07T02:00:50.2529708Z Sep 07 02:00:50  at 
> org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
> 2022-09-07T02:00:50.2530744Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
> 2022-09-07T02:00:50.2532008Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
> 2022-09-07T02:00:50.2533137Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
> 2022-09-07T02:00:50.2544265Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
> 2022-09-07T02:00:50.2545595Z Sep 07 02:00:50  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
> 2022-09-07T02:00:50.2546782Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
> 2022-09-07T02:00:50.2547810Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
> 2022-09-07T02:00:50.2548890Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
> 2022-09-07T02:00:50.2549932Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
> 2022-09-07T02:00:50.2550933Z Sep 07 02:00:50  at 
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
> 2022-09-07T02:00:50.2552325Z Sep 07 02:00:50  at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
> 2022-09-07T02:00:50.2553660Z Sep 07 02:00:50  at 
> org.junit.rules.RunRules.evaluate(RunRules.java:20)
> 

[GitHub] [flink-web] godfreyhe commented on a diff in pull request #574: Announcement blogpost for the 1.16 release

2022-10-24 Thread GitBox


godfreyhe commented on code in PR #574:
URL: https://github.com/apache/flink-web/pull/574#discussion_r1003917397


##
_posts/2022-10-15-1.16-announcement.md:
##
@@ -0,0 +1,402 @@
+---
+layout: post
+title:  "Announcing the Release of Apache Flink 1.16"
+subtitle: ""
+date: 2022-10-15T08:00:00.000Z
+categories: news
+authors:
+- godfreyhe:
+  name: "Godfrey He"
+  twitter: "godfreyhe"
+
+---
+
+Apache Flink continues to grow at a rapid pace and is one of the most active 
+communities in Apache. Flink 1.16 had over 230 contributors enthusiastically 
participating, 
+with 19 FLIPs and 900+ issues completed, bringing a lot of exciting features 
to the community.
+
+Flink has become the leading role and factual standard of stream processing, 
+and the concept of the unification of stream and batch data processing is 
gradually gaining recognition 
+and is being successfully implemented in more and more companies. Previously, 
+the integrated stream and batch concept placed more emphasis on a unified API 
and 
+a unified computing framework. This year, based on this, Flink proposed 
+the next development direction of [Flink-Streaming 
Warehouse](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821)
 (Streamhouse), 
+which further upgraded the scope of stream-batch integration: it truly 
completes not only 
+the unified computation but also unified storage, thus realizing unified 
real-time analysis.
+
+In 1.16, the Flink community has completed many improvements for both batch 
and stream processing:
+
+- For batch processing, all-round improvements in ease of use, stability and 
performance 
+ have been completed. 1.16 is a milestone version of Flink batch processing 
and an important 
+ step towards maturity.
+  - Ease of use:  with the introduction of SQL Gateway and full compatibility 
with Hive Server2, 
+  users can submit Flink SQL jobs and Hive SQL jobs very easily, and it is 
also easy to 
+  connect to the original Hive ecosystem.
+  - Functionality: Introduce Join hints which let Flink SQL users manually 
specify join strategies
+  to avoid unreasonable execution plans. The compatibility of Hive SQL has 
reached 94%, 
+  and users can migrate Hive to Flink at a very low cost.
+  - Stability: Propose a speculative execution mechanism to reduce the long 
tail sub-tasks of
+  a job and improve the stability. Improve HashJoin and introduce failure 
rollback mechanism
+  to avoid join failure.
+  - Performance: Introduce dynamic partition pruning to reduce the Scan I/O 
and improve join 
+  processing for the star-schema queries. There is 30% improvement in the 
TPC-DS benchmark. 
+  We can use hybrid shuffle mode to improve resource usage and processing 
performance.
+- For stream processing, there are a number of significant improvements:
+  - Changelog State Backend provides users with second or even millisecond 
checkpoints to 
+  dramatically improve the fault tolerance experience, while providing a 
smaller end-to-end 
+  latency experience for transactional Sink jobs.
+  - Lookup join is widely used in stream processing. Slow lookup speed, low 
throughput and 
+  delay update are resolved through common cache mechanism, asynchronous io 
and retriable lookup. 
+  These features are very useful, solving the pain points that users often 
complain about, 
+  and supporting richer scenarios.
+  - From the first day of the birth of Flink SQL, there were some 
non-deterministic operations 
+  that could cause incorrect results or exceptions, which caused great 
distress to users. 
+  In 1.16, we spent a lot of effort to solve most of the problems, and we will 
continue to 
+  improve in the future.
+
+With the further refinement of the integration of stream and batch, and the 
continuous iteration of 
+the Flink Table Store ([0.2 has been 
released](https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html)),
 
+the Flink community is pushing the Streaming warehouse from concept to reality 
and maturity step by step.
+
+# Understanding Streaming Warehouses
+
+To be precise, a streaming warehouse is to make data warehouse streaming, 
which allows the data 
+for each layer in the whole warehouse to flow in real-time. The goal is to 
realize 
+a Streaming Service with end-to-end real-time performance through a unified 
API and computing framework.
+Please refer to [the 
article](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821)
 
+for more details.
+
+# Batch processing
+
+Flink is a unified stream batch processing engine, stream processing has 
become the leading role 
+thanks to our long-term investment. We’re also putting more effort to improve 
batch processing 
+to make it an excellent computing engine. This makes the overall experience of 
stream batch 
+unification smoother.
+
+## SQL Gateway
+
+The feedback from various channels 

[jira] [Closed] (FLINK-28914) Could not find any factories that implement

2022-10-24 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-28914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang closed FLINK-28914.
-
Resolution: Fixed

> Could not find any factories that implement
> ---
>
> Key: FLINK-28914
> URL: https://issues.apache.org/jira/browse/FLINK-28914
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.16.0
>Reporter: Dongming WU
>Priority: Major
> Fix For: 1.17.0
>
>
> 2022-08-11 11:09:53,135 ERROR org.apache.flink.table.gateway.SqlGateway       
>              [] - Failed to start the endpoints.
> org.apache.flink.table.api.ValidationException: Could not find any factories 
> that implement 
> 'org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactory' in 
> the classpath.
> -
> I packaged Flink-Master and tried to start sql-gateway, but some problems 
> arise.
> I found tow problem with Factory under resources of flink-sql-gateway module.
> META-INF.services should not be a folder name, ti should be ... 
> /META-INF/services/... 
> The 
> `` org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory ``  in 
> the org.apache.flink.table.factories.Factory file should be 
> ``  org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointFactory `` 
> . 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27773) Introduce the E2E tests for SQL Gateway

2022-10-24 Thread Shengkai Fang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623496#comment-17623496
 ] 

Shengkai Fang commented on FLINK-27773:
---

[~martijnvisser] It has been implemented in the 1.16.

> Introduce the E2E tests for SQL Gateway 
> 
>
> Key: FLINK-27773
> URL: https://issues.apache.org/jira/browse/FLINK-27773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Reporter: Shengkai Fang
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27773) Introduce the E2E tests for SQL Gateway

2022-10-24 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang updated FLINK-27773:
--
Fix Version/s: 1.16.0

> Introduce the E2E tests for SQL Gateway 
> 
>
> Key: FLINK-27773
> URL: https://issues.apache.org/jira/browse/FLINK-27773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Reporter: Shengkai Fang
>Assignee: Wencong Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-29732) Support configuring session with SQL statement

2022-10-24 Thread Shengkai Fang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shengkai Fang reassigned FLINK-29732:
-

Assignee: yuzelin

> Support configuring session with SQL statement
> --
>
> Key: FLINK-29732
> URL: https://issues.apache.org/jira/browse/FLINK-29732
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Gateway
>Affects Versions: 1.17
>Reporter: yuzelin
>Assignee: yuzelin
>Priority: Major
>  Labels: pull-request-available
>
> Implement SqlGatewayService#configureSession described in 
> [FLIP-91|https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] godfreyhe commented on a diff in pull request #574: Announcement blogpost for the 1.16 release

2022-10-24 Thread GitBox


godfreyhe commented on code in PR #574:
URL: https://github.com/apache/flink-web/pull/574#discussion_r1003914702


##
_posts/2022-10-15-1.16-announcement.md:
##
@@ -0,0 +1,402 @@
+---
+layout: post
+title:  "Announcing the Release of Apache Flink 1.16"
+subtitle: ""
+date: 2022-10-15T08:00:00.000Z
+categories: news
+authors:
+- godfreyhe:
+  name: "Godfrey He"
+  twitter: "godfreyhe"
+
+---
+
+Apache Flink continues to grow at a rapid pace and is one of the most active 
+communities in Apache. Flink 1.16 had over 230 contributors enthusiastically 
participating, 
+with 19 FLIPs and 900+ issues completed, bringing a lot of exciting features 
to the community.
+
+Flink has become the leading role and factual standard of stream processing, 
+and the concept of the unification of stream and batch data processing is 
gradually gaining recognition 
+and is being successfully implemented in more and more companies. Previously, 
+the integrated stream and batch concept placed more emphasis on a unified API 
and 
+a unified computing framework. This year, based on this, Flink proposed 
+the next development direction of [Flink-Streaming 
Warehouse](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821)
 (Streamhouse), 
+which further upgraded the scope of stream-batch integration: it truly 
completes not only 
+the unified computation but also unified storage, thus realizing unified 
real-time analysis.
+
+In 1.16, the Flink community has completed many improvements for both batch 
and stream processing:
+
+- For batch processing, all-round improvements in ease of use, stability and 
performance 
+ have been completed. 1.16 is a milestone version of Flink batch processing 
and an important 
+ step towards maturity.
+  - Ease of use:  with the introduction of SQL Gateway and full compatibility 
with Hive Server2, 
+  users can submit Flink SQL jobs and Hive SQL jobs very easily, and it is 
also easy to 
+  connect to the original Hive ecosystem.
+  - Functionality: Introduce Join hints which let Flink SQL users manually 
specify join strategies
+  to avoid unreasonable execution plans. The compatibility of Hive SQL has 
reached 94%, 
+  and users can migrate Hive to Flink at a very low cost.
+  - Stability: Propose a speculative execution mechanism to reduce the long 
tail sub-tasks of
+  a job and improve the stability. Improve HashJoin and introduce failure 
rollback mechanism
+  to avoid join failure.
+  - Performance: Introduce dynamic partition pruning to reduce the Scan I/O 
and improve join 
+  processing for the star-schema queries. There is 30% improvement in the 
TPC-DS benchmark. 
+  We can use hybrid shuffle mode to improve resource usage and processing 
performance.
+- For stream processing, there are a number of significant improvements:
+  - Changelog State Backend provides users with second or even millisecond 
checkpoints to 
+  dramatically improve the fault tolerance experience, while providing a 
smaller end-to-end 
+  latency experience for transactional Sink jobs.
+  - Lookup join is widely used in stream processing. Slow lookup speed, low 
throughput and 
+  delay update are resolved through common cache mechanism, asynchronous io 
and retriable lookup. 
+  These features are very useful, solving the pain points that users often 
complain about, 
+  and supporting richer scenarios.
+  - From the first day of the birth of Flink SQL, there were some 
non-deterministic operations 
+  that could cause incorrect results or exceptions, which caused great 
distress to users. 
+  In 1.16, we spent a lot of effort to solve most of the problems, and we will 
continue to 
+  improve in the future.
+
+With the further refinement of the integration of stream and batch, and the 
continuous iteration of 
+the Flink Table Store ([0.2 has been 
released](https://flink.apache.org/news/2022/08/29/release-table-store-0.2.0.html)),
 
+the Flink community is pushing the Streaming warehouse from concept to reality 
and maturity step by step.
+
+# Understanding Streaming Warehouses
+
+To be precise, a streaming warehouse is to make data warehouse streaming, 
which allows the data 
+for each layer in the whole warehouse to flow in real-time. The goal is to 
realize 
+a Streaming Service with end-to-end real-time performance through a unified 
API and computing framework.
+Please refer to [the 
article](https://www.alibabacloud.com/blog/more-than-computing-a-new-era-led-by-the-warehouse-architecture-of-apache-flink_598821)
 
+for more details.
+
+# Batch processing
+
+Flink is a unified stream batch processing engine, stream processing has 
become the leading role 
+thanks to our long-term investment. We’re also putting more effort to improve 
batch processing 
+to make it an excellent computing engine. This makes the overall experience of 
stream batch 
+unification smoother.
+
+## SQL Gateway
+
+The feedback from various channels 

[GitHub] [flink-kubernetes-operator] morhidi closed pull request #372: [FLINK-29261] Use FAIL_ON_UNKNOWN_PROPERTIES=false in ReconciliationUtils

2022-10-24 Thread GitBox


morhidi closed pull request #372: [FLINK-29261] Use 
FAIL_ON_UNKNOWN_PROPERTIES=false in ReconciliationUtils
URL: https://github.com/apache/flink-kubernetes-operator/pull/372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #410: Added the CRD compatability check to kubernetes operator 1.2.0

2022-10-24 Thread GitBox


morhidi commented on PR #410:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/410#issuecomment-1289856878

   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-29746) Add workflow in github for micro benchmarks

2022-10-24 Thread Shammon (Jira)
Shammon created FLINK-29746:
---

 Summary: Add workflow in github for micro benchmarks
 Key: FLINK-29746
 URL: https://issues.apache.org/jira/browse/FLINK-29746
 Project: Flink
  Issue Type: Sub-task
  Components: Table Store
Affects Versions: table-store-0.2.2
Reporter: Shammon


Add workflow in github for micro benchmarks project



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-24 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623481#comment-17623481
 ] 

Benchao Li commented on FLINK-29692:


{quote}'Early fire' would periodically sent intermediate result to downstream, 
the frequency to send intermediate result is based on processing time, even 
thought the window itself based on event time. It mixed up processing time 
semantics with the time attribute of Window.
{quote}
I agree, it's not perfect.
{quote}If you need to retire window state and send intermediate result, how 
about trying "[cumulate window 
tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate];
 instead of 'early fire'?
{quote}
Cumulate window could cover most cases, except that it's semantic is different 
from others (unbounded aggregate and window with early-fire's output semantic 
is changelog for the same grouping key).

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29745) Split reader/writer factory for compaction in MergeTreeTest

2022-10-24 Thread Shammon (Jira)
Shammon created FLINK-29745:
---

 Summary: Split reader/writer factory for compaction in 
MergeTreeTest
 Key: FLINK-29745
 URL: https://issues.apache.org/jira/browse/FLINK-29745
 Project: Flink
  Issue Type: Bug
  Components: Table Store
Affects Versions: table-store-0.2.2
Reporter: Shammon






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29744) Throw DeploymentFailedException on ImagePullBackOff

2022-10-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29744:
---
Labels: pull-request-available  (was: )

> Throw DeploymentFailedException on ImagePullBackOff
> ---
>
> Key: FLINK-29744
> URL: https://issues.apache.org/jira/browse/FLINK-29744
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Matyas Orhidi
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi opened a new pull request, #411: [FLINK-29744] Throw DeploymentFailedException on ImagePullBackOff

2022-10-24 Thread GitBox


morhidi opened a new pull request, #411:
URL: https://github.com/apache/flink-kubernetes-operator/pull/411

   ## What is the purpose of the change
   
   Handle cases when pod is unable to pull images. (a.k.a `ImagePullBackOff`)
   
   ## Brief change log
   
   Code is already handling `CrashLoopBackOff` this PR adds extra 
`ImagePullBackOff ` handling.
   
   ## Verifying this change
   
   Added parameterized 
`FlinkDeploymentControllerTest.verifyInProgressDeploymentWithBackoff()`
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RocMarshal commented on pull request #20990: [FLINK-29050][JUnit5 Migration] Module: flink-hadoop-compatibility

2022-10-24 Thread GitBox


RocMarshal commented on PR #20990:
URL: https://github.com/apache/flink/pull/20990#issuecomment-1289823623

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-29744) Throw DeploymentFailedException on ImagePullBackOff

2022-10-24 Thread Matyas Orhidi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matyas Orhidi updated FLINK-29744:
--
Affects Version/s: kubernetes-operator-1.3.0

> Throw DeploymentFailedException on ImagePullBackOff
> ---
>
> Key: FLINK-29744
> URL: https://issues.apache.org/jira/browse/FLINK-29744
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.3.0
>Reporter: Matyas Orhidi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-29744) Throw DeploymentFailedException on ImagePullBackOff

2022-10-24 Thread Matyas Orhidi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matyas Orhidi updated FLINK-29744:
--
Component/s: Kubernetes Operator

> Throw DeploymentFailedException on ImagePullBackOff
> ---
>
> Key: FLINK-29744
> URL: https://issues.apache.org/jira/browse/FLINK-29744
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Matyas Orhidi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29744) Throw DeploymentFailedException on ImagePullBackOff

2022-10-24 Thread Matyas Orhidi (Jira)
Matyas Orhidi created FLINK-29744:
-

 Summary: Throw DeploymentFailedException on ImagePullBackOff
 Key: FLINK-29744
 URL: https://issues.apache.org/jira/browse/FLINK-29744
 Project: Flink
  Issue Type: Improvement
Reporter: Matyas Orhidi






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


morhidi commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289657005

   > @morhidi
   > 
   > Thanks for the feedback and sharing your test result:
   > 
   > > It resulted in an string, that might not be a valid JSON:
   > 
   > I will do some testing on this and validate the final JSON that's included 
in the CR field.
   > 
   > > Another thing that I found is the structure of the stacktrace probably 
too verbose:
   > 
   > Yup, it does contain several metadata we might not need like 
"classLoaderName", "nativeMethod". An alternative that I am considering is to 
store the stackTrace as a more compact List generated from 
ExceptionUtils.getStackFrames 
(https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))
   > 
   > However, the downside of this is each stackFrame is a String, so it's a 
bit more work to figure out which method/file the stackFrame is called from. 
Any thoughts on which one's more suited for our needs?
   
   We might not need the stack trace in a structured format. Maybe just dump it 
as a string:
   
   ```
   public static ExceptionDetail convertFromThrowable(Throwable t) {
   ExceptionDetail exceptionDetail = new ExceptionDetail();
   exceptionDetail.setMessage(t.toString());
   
exceptionDetail.setRootCauseMessage(Throwables.getRootCause(t).toString());
   exceptionDetail.setStacktrace(Throwables.getStackTraceAsString(t));
   return exceptionDetail;
   }
   ```
   
   Even if the root cause is not in the first cause. We would still find it I 
guess in the stack trace.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


morhidi commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289654405

   > @morhidi
   > 
   > Thanks for the feedback and sharing your test result:
   > 
   > > It resulted in an string, that might not be a valid JSON:
   > 
   > I will do some testing on this and validate the final JSON that's included 
in the CR field.
   > 
   > > Another thing that I found is the structure of the stacktrace probably 
too verbose:
   > 
   > Yup, it does contain several metadata we might not need like 
"classLoaderName", "nativeMethod". An alternative that I am considering is to 
store the stackTrace as a more compact List generated from 
ExceptionUtils.getStackFrames 
(https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))
   > 
   > However, the downside of this is each stackFrame is a String, so it's a 
bit more work to figure out which method/file the stackFrame is called from. 
Any thoughts on which one's more suited for our needs?
   
   The error JSON looks good in the logs. Things go wrong when you it is saved 
in the CS, and you dump it as a yaml. It breaks the single line error JSON into 
a multiline YAML structure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289640460

   @morhidi 
   
   Thanks for the feedback and sharing your test result:
   
   >It resulted in an string, that might not be a valid JSON:
   
   I will do some testing on this and validate the final JSON that's included 
in the CR field.
   
   >Another thing that I found is the structure of the stacktrace probably too 
verbose:
   
   Yup, it does contain several metadata we might not need like 
"classLoaderName", "nativeMethod". An alternative that I am considering is to 
store the stackTrace as a more compact List generated from 
ExceptionUtils.getStackFrames
   
(https://commons.apache.org/proper/commons-lang/javadocs/api-3.1/org/apache/commons/lang3/exception/ExceptionUtils.html#getStackFrames(java.lang.Throwable))
   
   However, the downside of this is each stackFrame is a String, so it's a bit 
more work to figure out which method/file the stackFrame is called from. Any 
thoughts on which one's more suited for our needs?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on code in PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#discussion_r1003776097


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.exception.FlinkResourceException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.runtime.rest.util.RestClientException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Flink Resource Exception utilities. */
+public final class FlinkResourceExceptionUtils {
+
+public static  void 
updateFlinkResourceException(
+Throwable throwable, R resource) {
+
+FlinkResourceException flinkResourceException =
+getFlinkResourceException(throwable, 
getStackTraceEnabled(resource));
+
+try {
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(convertToJson(flinkResourceException));
+} catch (Exception e) {
+// Rollback to setting error string/message to CRD
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(
+(e instanceof ReconciliationException)
+? e.getCause().toString()
+: e.toString());
+}
+}
+
+private static  boolean 
getStackTraceEnabled(R resource) {
+return Optional.ofNullable((AbstractFlinkSpec) resource.getSpec())
+.map(AbstractFlinkSpec::isStackTraceEnabled)
+.orElse(false);
+}
+
+private static FlinkResourceException getFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+convertToFlinkResourceException(throwable, 
isStackTraceEnabled);
+
+flinkResourceException.setThrowableList(
+ExceptionUtils.getThrowableList(
+throwable.getCause())
+.stream()
+.map((t) -> convertToFlinkResourceException(t, false))
+.collect(Collectors.toList()));
+
+return flinkResourceException;
+}
+
+private static FlinkResourceException convertToFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+FlinkResourceException.builder()
+.type(throwable.getClass().getName())
+.message(throwable.getMessage())
+.build();
+
+if (isStackTraceEnabled) {
+
flinkResourceException.setStackTraceElements(throwable.getStackTrace());
+}
+
+enrichMetadata(throwable, flinkResourceException);
+
+return flinkResourceException;
+}
+
+private static void enrichMetadata(
+Throwable throwable, FlinkResourceException 
flinkResourceException) {
+if (throwable instanceof RestClientException) {
+flinkResourceException.setAdditionalMetadata(
+Map.of(
+"httpResponseCode",
+((RestClientException) 
throwable).getHttpResponseStatus().code()));
+}
+
+if (throwable instanceof DeploymentFailedException) {
+flinkResourceException.setAdditionalMetadata(
+Map.of("reason", ((DeploymentFailedException) 

[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on code in PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#discussion_r1003775892


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.exception.FlinkResourceException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.runtime.rest.util.RestClientException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Flink Resource Exception utilities. */
+public final class FlinkResourceExceptionUtils {
+
+public static  void 
updateFlinkResourceException(
+Throwable throwable, R resource) {
+
+FlinkResourceException flinkResourceException =
+getFlinkResourceException(throwable, 
getStackTraceEnabled(resource));
+
+try {
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(convertToJson(flinkResourceException));
+} catch (Exception e) {
+// Rollback to setting error string/message to CRD
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(
+(e instanceof ReconciliationException)
+? e.getCause().toString()
+: e.toString());
+}
+}
+
+private static  boolean 
getStackTraceEnabled(R resource) {
+return Optional.ofNullable((AbstractFlinkSpec) resource.getSpec())
+.map(AbstractFlinkSpec::isStackTraceEnabled)
+.orElse(false);
+}
+
+private static FlinkResourceException getFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+convertToFlinkResourceException(throwable, 
isStackTraceEnabled);
+
+flinkResourceException.setThrowableList(
+ExceptionUtils.getThrowableList(
+throwable.getCause())
+.stream()
+.map((t) -> convertToFlinkResourceException(t, false))
+.collect(Collectors.toList()));
+
+return flinkResourceException;
+}
+
+private static FlinkResourceException convertToFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+FlinkResourceException.builder()
+.type(throwable.getClass().getName())
+.message(throwable.getMessage())
+.build();
+
+if (isStackTraceEnabled) {
+
flinkResourceException.setStackTraceElements(throwable.getStackTrace());

Review Comment:
   Added ConfigOption to limit stackTrace list size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on code in PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#discussion_r1003775450


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.exception.FlinkResourceException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.runtime.rest.util.RestClientException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Flink Resource Exception utilities. */
+public final class FlinkResourceExceptionUtils {
+
+public static  void 
updateFlinkResourceException(
+Throwable throwable, R resource) {
+
+FlinkResourceException flinkResourceException =
+getFlinkResourceException(throwable, 
getStackTraceEnabled(resource));
+
+try {
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(convertToJson(flinkResourceException));
+} catch (Exception e) {
+// Rollback to setting error string/message to CRD
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(
+(e instanceof ReconciliationException)
+? e.getCause().toString()
+: e.toString());
+}
+}
+
+private static  boolean 
getStackTraceEnabled(R resource) {
+return Optional.ofNullable((AbstractFlinkSpec) resource.getSpec())
+.map(AbstractFlinkSpec::isStackTraceEnabled)
+.orElse(false);
+}
+
+private static FlinkResourceException getFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+convertToFlinkResourceException(throwable, 
isStackTraceEnabled);
+
+flinkResourceException.setThrowableList(
+ExceptionUtils.getThrowableList(
+throwable.getCause())
+.stream()
+.map((t) -> convertToFlinkResourceException(t, false))
+.collect(Collectors.toList()));

Review Comment:
   I have added the default limit as ConfigOption and set it to 2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on code in PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#discussion_r1003775090


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkResourceExceptionUtils.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
+import org.apache.flink.kubernetes.operator.exception.FlinkResourceException;
+import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import org.apache.flink.runtime.rest.util.RestClientException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Flink Resource Exception utilities. */
+public final class FlinkResourceExceptionUtils {
+
+public static  void 
updateFlinkResourceException(
+Throwable throwable, R resource) {
+
+FlinkResourceException flinkResourceException =
+getFlinkResourceException(throwable, 
getStackTraceEnabled(resource));
+
+try {
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(convertToJson(flinkResourceException));
+} catch (Exception e) {
+// Rollback to setting error string/message to CRD
+((AbstractFlinkResource) resource)
+.getStatus()
+.setError(
+(e instanceof ReconciliationException)
+? e.getCause().toString()
+: e.toString());
+}
+}
+
+private static  boolean 
getStackTraceEnabled(R resource) {
+return Optional.ofNullable((AbstractFlinkSpec) resource.getSpec())
+.map(AbstractFlinkSpec::isStackTraceEnabled)
+.orElse(false);
+}
+
+private static FlinkResourceException getFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+convertToFlinkResourceException(throwable, 
isStackTraceEnabled);
+
+flinkResourceException.setThrowableList(
+ExceptionUtils.getThrowableList(
+throwable.getCause())
+.stream()
+.map((t) -> convertToFlinkResourceException(t, false))
+.collect(Collectors.toList()));
+
+return flinkResourceException;
+}
+
+private static FlinkResourceException convertToFlinkResourceException(
+Throwable throwable, boolean isStackTraceEnabled) {
+FlinkResourceException flinkResourceException =
+FlinkResourceException.builder()
+.type(throwable.getClass().getName())
+.message(throwable.getMessage())
+.build();
+
+if (isStackTraceEnabled) {
+
flinkResourceException.setStackTraceElements(throwable.getStackTrace());
+}
+
+enrichMetadata(throwable, flinkResourceException);
+
+return flinkResourceException;
+}
+
+private static void enrichMetadata(
+Throwable throwable, FlinkResourceException 
flinkResourceException) {
+if (throwable instanceof RestClientException) {
+flinkResourceException.setAdditionalMetadata(
+Map.of(
+"httpResponseCode",
+((RestClientException) 
throwable).getHttpResponseStatus().code()));
+}
+
+if (throwable instanceof DeploymentFailedException) {
+flinkResourceException.setAdditionalMetadata(
+Map.of("reason", ((DeploymentFailedException) 

[GitHub] [flink-kubernetes-operator] darenwkt commented on a diff in pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


darenwkt commented on code in PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#discussion_r1003773876


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/AbstractFlinkSpec.java:
##
@@ -57,6 +57,9 @@ public abstract class AbstractFlinkSpec implements 
Diffable {
 })
 private Map flinkConfiguration;
 
+/** Exception specification to include stack trace in CRD. */
+private boolean stackTraceEnabled;

Review Comment:
   Thanks for the suggestion, I have moved them to ConfigOption along with new 
config to limit the stacktrace, throwable size etc...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


morhidi commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289601914

   Another thing that I found is the structure of the stacktrace probably too 
verbose:
   
   ```{
 "type": 
"org.apache.flink.kubernetes.operator.exception.ReconciliationException",
 "message": "java.lang.IllegalArgumentException: Only \"local\" is 
supported as schema for application mode. This assumes that the jar is located 
in the image, not the Flink client. An example of such pathis: 
local:///opt/flink/examples/streaming/WindowJoin.jar",
 "stackTraceElements": [
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "reconcile",
 "fileName": "FlinkDeploymentController.java",
 "lineNumber": 133,
 "className": 
"org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "reconcile",
 "fileName": "FlinkDeploymentController.java",
 "lineNumber": 54,
 "className": 
"org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "execute",
 "fileName": "Controller.java",
 "lineNumber": 130,
 "className": "io.javaoperatorsdk.operator.processing.Controller$1",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "execute",
 "fileName": "Controller.java",
 "lineNumber": 88,
 "className": "io.javaoperatorsdk.operator.processing.Controller$1",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "timeControllerExecution",
 "fileName": "Metrics.java",
 "lineNumber": 197,
 "className": "io.javaoperatorsdk.operator.api.monitoring.Metrics",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "reconcile",
 "fileName": "Controller.java",
 "lineNumber": 87,
 "className": "io.javaoperatorsdk.operator.processing.Controller",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "reconcileExecution",
 "fileName": "ReconciliationDispatcher.java",
 "lineNumber": 135,
 "className": 
"io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "handleReconcile",
 "fileName": "ReconciliationDispatcher.java",
 "lineNumber": 115,
 "className": 
"io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "handleDispatch",
 "fileName": "ReconciliationDispatcher.java",
 "lineNumber": 86,
 "className": 
"io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "handleExecution",
 "fileName": "ReconciliationDispatcher.java",
 "lineNumber": 59,
 "className": 
"io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher",
 "nativeMethod": false
   },
   {
 "classLoaderName": "app",
 "moduleName": null,
 "moduleVersion": null,
 "methodName": "run",
 "fileName": "EventProcessor.java",
 "lineNumber": 395,
 "className": 
"io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor",
 "nativeMethod": false
   },
   {
 "classLoaderName": null,
 "moduleName": "java.base",
 "moduleVersion": "11.0.14",
 "methodName": "runWorker",
 "fileName": "ThreadPoolExecutor.java",
 "lineNumber": 1128,
 "className": "java.util.concurrent.ThreadPoolExecutor",
 "nativeMethod": false
   },
   {
 "classLoaderName": null,
 "moduleName": "java.base",
 "moduleVersion": "11.0.14",
 "methodName": "run",
 "fileName": "ThreadPoolExecutor.java",
  

[jira] [Comment Edited] (FLINK-29235) CVE-2022-25857 on flink-shaded

2022-10-24 Thread Sergio Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623369#comment-17623369
 ] 

Sergio Sainz edited comment on FLINK-29235 at 10/24/22 7:45 PM:


Hello [~chesnay] 

Noticed the flink-shaded-jackson v2.13.4-16.0 already has the fix (it uses 
jackson's own snakeyaml version which is 1.31). Could we upgrade flink-shaded 
version in flink version 1.16.0 to use 2.13.4-16.0?

[https://github.com/apache/flink/blob/release-1.16.0-rc2/pom.xml#L125]


{{16.0    
}}{{2.13.4}}

 

Ref:

[https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.13.4/jackson-dataformat-yaml-2.13.4.pom]

[https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.13.4-16.0/flink-shaded-jackson-2.13.4-16.0.pom]

 


was (Author: sergiosp):
Hello [~chesnay] 

Noticed the flink-shaded-jackson v2.13.4-16.0 already has the fix (it uses 
jackson's own snakeyaml version which is 1.31). Could we upgrade flink-shaded 
version in flink version 1.16.0 to use 2.13.4-16.0?

[https://github.com/apache/flink/blob/release-1.16.0-rc2/pom.xml#L125]

```

        16.0
        2.13.4

```

 

ref:

[https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.13.4/jackson-dataformat-yaml-2.13.4.pom]

[https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.13.4-16.0/flink-shaded-jackson-2.13.4-16.0.pom]

 

> CVE-2022-25857 on flink-shaded
> --
>
> Key: FLINK-29235
> URL: https://issues.apache.org/jira/browse/FLINK-29235
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, BuildSystem / Shaded
>Affects Versions: 1.15.2
>Reporter: Sergio Sainz
>Assignee: Chesnay Schepler
>Priority: Major
>
> flink-shaded-version uses snakeyaml v1.29 which is vulnerable to 
> CVE-2022-25857
> Ref:
> https://nvd.nist.gov/vuln/detail/CVE-2022-25857
> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.pom
> https://github.com/apache/flink-shaded/blob/master/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml#L73



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-29235) CVE-2022-25857 on flink-shaded

2022-10-24 Thread Sergio Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623369#comment-17623369
 ] 

Sergio Sainz edited comment on FLINK-29235 at 10/24/22 7:45 PM:


Hello [~chesnay] 

Noticed the flink-shaded-jackson v2.13.4-16.0 already has the fix (it uses 
jackson's own snakeyaml version which is 1.31). Could we upgrade flink-shaded 
version in flink version 1.16.0 to use 2.13.4-16.0?

[https://github.com/apache/flink/blob/release-1.16.0-rc2/pom.xml#L125]

{{16.0    
2.13.4}}

 

Ref:

[https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.13.4/jackson-dataformat-yaml-2.13.4.pom]

[https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.13.4-16.0/flink-shaded-jackson-2.13.4-16.0.pom]

 


was (Author: sergiosp):
Hello [~chesnay] 

Noticed the flink-shaded-jackson v2.13.4-16.0 already has the fix (it uses 
jackson's own snakeyaml version which is 1.31). Could we upgrade flink-shaded 
version in flink version 1.16.0 to use 2.13.4-16.0?

[https://github.com/apache/flink/blob/release-1.16.0-rc2/pom.xml#L125]


{{16.0    
}}{{2.13.4}}

 

Ref:

[https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.13.4/jackson-dataformat-yaml-2.13.4.pom]

[https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.13.4-16.0/flink-shaded-jackson-2.13.4-16.0.pom]

 

> CVE-2022-25857 on flink-shaded
> --
>
> Key: FLINK-29235
> URL: https://issues.apache.org/jira/browse/FLINK-29235
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, BuildSystem / Shaded
>Affects Versions: 1.15.2
>Reporter: Sergio Sainz
>Assignee: Chesnay Schepler
>Priority: Major
>
> flink-shaded-version uses snakeyaml v1.29 which is vulnerable to 
> CVE-2022-25857
> Ref:
> https://nvd.nist.gov/vuln/detail/CVE-2022-25857
> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.pom
> https://github.com/apache/flink-shaded/blob/master/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml#L73



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29235) CVE-2022-25857 on flink-shaded

2022-10-24 Thread Sergio Sainz (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623369#comment-17623369
 ] 

Sergio Sainz commented on FLINK-29235:
--

Hello [~chesnay] 

Noticed the flink-shaded-jackson v2.13.4-16.0 already has the fix (it uses 
jackson's own snakeyaml version which is 1.31). Could we upgrade flink-shaded 
version in flink version 1.16.0 to use 2.13.4-16.0?

[https://github.com/apache/flink/blob/release-1.16.0-rc2/pom.xml#L125]

```

        16.0
        2.13.4

```

 

ref:

[https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-yaml/2.13.4/jackson-dataformat-yaml-2.13.4.pom]

[https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.13.4-16.0/flink-shaded-jackson-2.13.4-16.0.pom]

 

> CVE-2022-25857 on flink-shaded
> --
>
> Key: FLINK-29235
> URL: https://issues.apache.org/jira/browse/FLINK-29235
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, BuildSystem / Shaded
>Affects Versions: 1.15.2
>Reporter: Sergio Sainz
>Assignee: Chesnay Schepler
>Priority: Major
>
> flink-shaded-version uses snakeyaml v1.29 which is vulnerable to 
> CVE-2022-25857
> Ref:
> https://nvd.nist.gov/vuln/detail/CVE-2022-25857
> https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-jackson/2.12.4-15.0/flink-shaded-jackson-2.12.4-15.0.pom
> https://github.com/apache/flink-shaded/blob/master/flink-shaded-jackson-parent/flink-shaded-jackson-2/pom.xml#L73



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] qingwei91 commented on pull request #20140: [Flink 16024][Connector][JDBC] Support FilterPushdown

2022-10-24 Thread GitBox


qingwei91 commented on PR #20140:
URL: https://github.com/apache/flink/pull/20140#issuecomment-1289455651

   Hi @libenchao , thank you very much for your review  
   
   I've addressed all of your concern. On the TablePlanTest, do you mind to 
check if that's how it supposed to work? I don't think I understand internal 
good enough to judge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-29529) Update flink version in flink-python-example of flink k8s operator

2022-10-24 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-29529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-29529.
--
Resolution: Fixed

merged to main a978375daeefbf23b548aab940a7ad3c366aa661

> Update flink version in flink-python-example of flink k8s operator
> --
>
> Key: FLINK-29529
> URL: https://issues.apache.org/jira/browse/FLINK-29529
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: Biao Geng
>Assignee: Sriram Ganesh
>Priority: Minor
>
> Currently, we hardcoded the version of both flink image and pyflink pip 
> package as 1.15.0 in the example's Dockerfile. It is not the best practice as 
> the flink has new 1.15.x releases.
> We had better do following improvements:
> {{FROM flink:1.15.0 -> FROM flink:1.15}}
> {{RUN pip3 install apache-flink==1.15.0 -> RUN pip3 install 
> "apache-flink>=1.15.0,<1.16.0"}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


dannycranmer commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003629002


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/PrimaryKeyBuilder.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.dynamodb.sink.InvalidRequestException;
+import org.apache.flink.util.CollectionUtil;
+
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/** Helper to construct primary(composite) key for a DynamoDB request. */
+@Internal
+public class PrimaryKeyBuilder {
+
+public static String build(List pKeys, WriteRequest request) {
+
+if (CollectionUtil.isNullOrEmpty(pKeys)) {
+// fake key, because no dynamodb partition key or sort key 
provided. Using UUID should be safe
+// here, as we have at most 25 elements per batch
+return UUID.randomUUID().toString();
+} else {
+Map requestItems = 
getRequestItems(request);
+
+StringBuilder builder = new StringBuilder();
+for (String keyName : pKeys) {
+AttributeValue value = requestItems.get(keyName);
+
+if (value == null) {
+throw new InvalidRequestException(
+"Request " + request.toString() + " does not 
contain pKey " + keyName);
+}
+
+builder.append(getKeyValue(value));
+}
+
+return builder.toString();
+}
+}
+
+/**
+ * Returns string value of a partition key attribute. Each primary key 
attribute must be defined
+ * as type String, Number, or binary as per DynamoDB specification.
+ */
+private static String getKeyValue(AttributeValue value) {
+StringBuilder builder = new StringBuilder();
+
+if (value.n() != null) {
+builder.append(value.n());
+}
+
+if (value.s() != null) {
+builder.append(value.s());
+}
+
+if (value.b() != null) {
+builder.append(value.b().asUtf8String());
+}
+
+return builder.toString();

Review Comment:
   We already validate that the `AttributeValue` is not null above. How about 
we validate the return of this method is not empty?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora merged pull request #408: Added improvements on the flink python example Dockerfile

2022-10-24 Thread GitBox


gyfora merged PR #408:
URL: https://github.com/apache/flink-kubernetes-operator/pull/408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] dannycranmer commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


dannycranmer commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003626614


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * {@code
+ * private static class DummyDynamoDbElementConverter implements 
ElementConverter {
+ *
+ * @Override
+ * public DynamoDbWriteRequest apply(String s) {
+ * final Map item = new HashMap<>();
+ * item.put("your-key", AttributeValue.builder().s(s).build());
+ * return new DynamoDbWriteRequest(
+ *   WriteRequest.builder()
+ *   .putRequest(PutRequest.builder()
+ *   .item(item)
+ *   .build())
+ *   .build()
+ *   );
+ * }
+ * }
+ * DynamoDbSink dynamoDbSink = DynamoDbSink.builder()
+ *  .setElementConverter(new 
DummyDynamoDbElementConverter())
+ *  
.setDestinationTableName("your-table-name")
+ *   .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 25
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 1
+ *   {@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 
1000}
+ *   {@code maxTimeInBufferMS} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000}
+ *   {@code failOnError} will be false
+ *   {@code destinationTableName} destination table for the sink
+ *   {@code overwriteByPKeys} will be empty meaning no records 
deduplication will be performed
+ *   by the batch sink
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class DynamoDbSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 25;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 16 * 1000 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 400 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+
+private boolean failOnError;
+private Properties dynamodbClientProperties;
+
+private ElementConverter elementConverter;
+private String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkBuilder setDynamoDbProperties(Properties 
properties) {
+this.dynamodbClientProperties = properties;
+return this;
+}
+
+public DynamoDbSinkBuilder setElementConverter(
+ElementConverter elementConverter) {
+this.elementConverter = elementConverter;
+return this;
+}
+
+/** Destination DynamoDB table name for the sink to write to. */
+public DynamoDbSinkBuilder setDestinationTableName(String 
tableName) {
+this.tableName = tableName;
+return this;
+}
+
+/**
+ * @param overwriteByPKeys provide partition key and (optionally) sort key 
name if you want to
+ * bypass no duplication limitation of single batch write request. 
Batching DynamoDB sink
+ * will drop request items in the buffer if their 

[jira] [Commented] (FLINK-29731) Protobuf-confluent format

2022-10-24 Thread gang ye (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623316#comment-17623316
 ] 

gang ye commented on FLINK-29731:
-

Is this for adding ConfluentSchemaRegistryCoder for Protobuf (like avro 
https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java#L36)
 

> Protobuf-confluent format
> -
>
> Key: FLINK-29731
> URL: https://issues.apache.org/jira/browse/FLINK-29731
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.17.0
> Environment: Confluent kafka with schema registry
>Reporter: Krish Narukulla
>Priority: Major
>
> Flink needs to integrate with Confluent for protobuf format, to able to 
> leverage schema registry for schema versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] morhidi commented on pull request #409: [FLINK-29708] Convert CR Error field to JSON with enriched exception …

2022-10-24 Thread GitBox


morhidi commented on PR #409:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/409#issuecomment-1289362006

   I tested it and it almost works great, apart from the minor things, Gyula 
mentioned. There'll be probably issues with JSON encoding: I've tested it with 
the following invalid CR (missing volumes):
   ```
   

   #  Licensed to the Apache Software Foundation (ASF) under one
   #  or more contributor license agreements.  See the NOTICE file
   #  distributed with this work for additional information
   #  regarding copyright ownership.  The ASF licenses this file
   #  to you under the Apache License, Version 2.0 (the
   #  "License"); you may not use this file except in compliance
   #  with the License.  You may obtain a copy of the License at
   #
   #  http://www.apache.org/licenses/LICENSE-2.0
   #
   #  Unless required by applicable law or agreed to in writing, software
   #  distributed under the License is distributed on an "AS IS" BASIS,
   #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   #  See the License for the specific language governing permissions and
   # limitations under the License.
   

   
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
 name: basic-example
   spec:
 image: flink:1.15
 flinkVersion: v1_15
 flinkConfiguration:
   taskmanager.numberOfTaskSlots: "2"
 serviceAccount: flink
 podTemplate:
   apiVersion: v1
   kind: Pod
   metadata:
 name: pod-template
   spec:
 containers:
   - name: flink-main-container
 volumeMounts:
   - mountPath: /opt/flink/log
 name: flink-logs
   - mountPath: /opt/flink/downloads
 name: downloads
 jobManager:
   resource:
 memory: "2048m"
 cpu: 1
 taskManager:
   resource:
 memory: "2048m"
 cpu: 1
 job:
   jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
   parallelism: 2
   upgradeMode: stateless
   ```
   
   It resulted in an string, that might not be a valid JSON:
   ```
   
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
 Could not create Kubernetes cluster 
\"basic-example\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could
 not create Kubernetes cluster 
\"basic-example\"."},{"type":"io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure
 executing: POST at: 
https://127.0.0.1:53846/apis/apps/v1/namespaces/default/deployments.
 Message: Deployment.apps \"basic-example\" is invalid: 
[spec.template.spec.containers[0].volumeMounts[0].name:
 Not found: \"flink-logs\", 
spec.template.spec.containers[0].volumeMounts[1].name:
 Not found: \"downloads\"]. Received status: Status(apiVersion=v1, code=422,
 
details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[0].name,
 message=Not found: \"flink-logs\", reason=FieldValueNotFound, 
additionalProperties={}),
 StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, 
message=Not
 found: \"downloads\", reason=FieldValueNotFound, additionalProperties={})],
 group=apps, kind=Deployment, name=basic-example, retryAfterSeconds=null, 
uid=null,
 additionalProperties={}), kind=Status, message=Deployment.apps 
\"basic-example\"
 is invalid: [spec.template.spec.containers[0].volumeMounts[0].name: Not 
found:
   \"flink-logs\", spec.template.spec.containers[0].volumeMounts[1].name: 
Not found:
   \"downloads\"], metadata=ListMeta(_continue=null, 
remainingItemCount=null, resourceVersion=null,
   selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, 
additionalProperties={})."}]}
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003539179


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+/* A counter for the total number of records that have encountered an 
error during put */
+private final Counter numRecordsSendErrorsCounter;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+private final DynamoDbAsyncClient client;
+private final boolean failOnError;
+private final String tableName;
+
+private List overwriteByPKeys;

Review Comment:
   sorry missed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003539179


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+/* A counter for the total number of records that have encountered an 
error during put */
+private final Counter numRecordsSendErrorsCounter;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+private final DynamoDbAsyncClient client;
+private final boolean failOnError;
+private final String tableName;
+
+private List overwriteByPKeys;

Review Comment:
   sorry missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003536293


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+/* A counter for the total number of records that have encountered an 
error during put */
+private final Counter numRecordsOutErrorsCounter;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+private final DynamoDbAsyncClient client;
+private final boolean failOnError;
+private final String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkWriter(
+ElementConverter elementConverter,
+InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int maxBufferedRequests,
+long maxBatchSizeInBytes,
+long maxTimeInBufferMS,
+long maxRecordSizeInBytes,
+boolean failOnError,
+String tableName,
+List overwriteByPKeys,
+Properties dynamoDbClientProperties,
+Collection> states) {
+super(
+elementConverter,
+context,
+maxBatchSize,
+maxInFlightRequests,
+maxBufferedRequests,
+maxBatchSizeInBytes,
+maxTimeInBufferMS,
+maxRecordSizeInBytes,
+states);
+this.failOnError = failOnError;
+this.tableName = tableName;
+this.overwriteByPKeys = overwriteByPKeys;
+this.metrics = context.metricGroup();
+this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);

Review Comment:
   I agree, I'll work on removing retries logic and using standard base class. 
It was developed on top of base classes when kinesis connector was actively 
worked on. We can rely on standard retry for now (can pick 

[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003528260


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * {@code
+ * private static class DummyDynamoDbElementConverter implements 
ElementConverter {
+ *
+ * @Override
+ * public DynamoDbWriteRequest apply(String s) {
+ * final Map item = new HashMap<>();
+ * item.put("your-key", AttributeValue.builder().s(s).build());
+ * return new DynamoDbWriteRequest(
+ *   WriteRequest.builder()
+ *   .putRequest(PutRequest.builder()
+ *   .item(item)
+ *   .build())
+ *   .build()
+ *   );
+ * }
+ * }
+ * DynamoDbSink dynamoDbSink = DynamoDbSink.builder()
+ *  .setElementConverter(new 
DummyDynamoDbElementConverter())
+ *  
.setDestinationTableName("your-table-name")
+ *   .build();
+ * }
+ *
+ * If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * 
+ *   {@code maxBatchSize} will be 25
+ *   {@code maxInFlightRequests} will be 50
+ *   {@code maxBufferedRequests} will be 1
+ *   {@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 
1000}
+ *   {@code maxTimeInBufferMS} will be 5000ms
+ *   {@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000}
+ *   {@code failOnError} will be false
+ *   {@code destinationTableName} destination table for the sink
+ *   {@code overwriteByPKeys} will be empty meaning no records 
deduplication will be performed
+ *   by the batch sink
+ * 
+ *
+ * @param  type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class DynamoDbSinkBuilder
+extends AsyncSinkBaseBuilder> {
+
+private static final int DEFAULT_MAX_BATCH_SIZE = 25;
+private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 1;
+private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 16 * 1000 * 1000;
+private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 400 * 1000;
+private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+
+private boolean failOnError;
+private Properties dynamodbClientProperties;
+
+private ElementConverter elementConverter;
+private String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkBuilder setDynamoDbProperties(Properties 
properties) {
+this.dynamodbClientProperties = properties;
+return this;
+}
+
+public DynamoDbSinkBuilder setElementConverter(
+ElementConverter elementConverter) {
+this.elementConverter = elementConverter;
+return this;
+}
+
+/** Destination DynamoDB table name for the sink to write to. */
+public DynamoDbSinkBuilder setDestinationTableName(String 
tableName) {
+this.tableName = tableName;
+return this;
+}
+
+/**
+ * @param overwriteByPKeys provide partition key and (optionally) sort key 
name if you want to
+ * bypass no duplication limitation of single batch write request. 
Batching DynamoDB sink
+ * will drop request items in the buffer if their 

[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003526661


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+/* A counter for the total number of records that have encountered an 
error during put */
+private final Counter numRecordsOutErrorsCounter;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+private final DynamoDbAsyncClient client;
+private final boolean failOnError;
+private final String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkWriter(
+ElementConverter elementConverter,
+InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int maxBufferedRequests,
+long maxBatchSizeInBytes,
+long maxTimeInBufferMS,
+long maxRecordSizeInBytes,
+boolean failOnError,
+String tableName,
+List overwriteByPKeys,
+Properties dynamoDbClientProperties,
+Collection> states) {
+super(
+elementConverter,
+context,
+maxBatchSize,
+maxInFlightRequests,
+maxBufferedRequests,
+maxBatchSizeInBytes,
+maxTimeInBufferMS,
+maxRecordSizeInBytes,
+states);
+this.failOnError = failOnError;
+this.tableName = tableName;
+this.overwriteByPKeys = overwriteByPKeys;
+this.metrics = context.metricGroup();
+this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+}
+
+@Override
+protected void submitRequestEntries(
+List requestEntries,
+Consumer> requestResultConsumer) {
+
+TableRequestsContainer container = new 

[GitHub] [flink-connector-dynamodb] YuriGusev commented on a diff in pull request #1: [FLINK-24229][Connectors][DynamoDB] - Add AWS DynamoDB connector

2022-10-24 Thread GitBox


YuriGusev commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r1003526661


##
flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java:
##
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter extends AsyncSinkWriter {
+private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+/* A counter for the total number of records that have encountered an 
error during put */
+private final Counter numRecordsOutErrorsCounter;
+
+/* The sink writer metric group */
+private final SinkWriterMetricGroup metrics;
+
+private final DynamoDbAsyncClient client;
+private final boolean failOnError;
+private final String tableName;
+
+private List overwriteByPKeys;
+
+public DynamoDbSinkWriter(
+ElementConverter elementConverter,
+InitContext context,
+int maxBatchSize,
+int maxInFlightRequests,
+int maxBufferedRequests,
+long maxBatchSizeInBytes,
+long maxTimeInBufferMS,
+long maxRecordSizeInBytes,
+boolean failOnError,
+String tableName,
+List overwriteByPKeys,
+Properties dynamoDbClientProperties,
+Collection> states) {
+super(
+elementConverter,
+context,
+maxBatchSize,
+maxInFlightRequests,
+maxBufferedRequests,
+maxBatchSizeInBytes,
+maxTimeInBufferMS,
+maxRecordSizeInBytes,
+states);
+this.failOnError = failOnError;
+this.tableName = tableName;
+this.overwriteByPKeys = overwriteByPKeys;
+this.metrics = context.metricGroup();
+this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+}
+
+@Override
+protected void submitRequestEntries(
+List requestEntries,
+Consumer> requestResultConsumer) {
+
+TableRequestsContainer container = new 

[GitHub] [flink] Samrat002 commented on pull request #21135: [FLINK-29733][hotfix/test] fix test error in flink-connector-hive

2022-10-24 Thread GitBox


Samrat002 commented on PR #21135:
URL: https://github.com/apache/flink/pull/21135#issuecomment-1289169354

   test failed due to 
[FLINK-24119](https://issues.apache.org/jira/browse/FLINK-24119)
   
   Please review !
   @HuangXingBo 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-24119) KafkaITCase.testTimestamps fails due to "Topic xxx already exist"

2022-10-24 Thread Samrat Deb (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623232#comment-17623232
 ] 

Samrat Deb commented on FLINK-24119:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=42354=logs=219f6d90-20a2-5863-7c1b-c80377a1018f=20186858-1485-5059-c9c6-446952519524=37146

> KafkaITCase.testTimestamps fails due to "Topic xxx already exist"
> -
>
> Key: FLINK-24119
> URL: https://issues.apache.org/jira/browse/FLINK-24119
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.14.0, 1.15.0, 1.16.0
>Reporter: Xintong Song
>Assignee: Qingsheng Ren
>Priority: Critical
>  Labels: auto-deprioritized-critical, test-stability
> Fix For: 1.16.1
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=23328=logs=c5f0071e-1851-543e-9a45-9ac140befc32=15a22db7-8faa-5b34-3920-d33c9f0ca23c=7419
> {code}
> Sep 01 15:53:20 [ERROR] Tests run: 23, Failures: 1, Errors: 0, Skipped: 0, 
> Time elapsed: 162.65 s <<< FAILURE! - in 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase
> Sep 01 15:53:20 [ERROR] testTimestamps  Time elapsed: 23.237 s  <<< FAILURE!
> Sep 01 15:53:20 java.lang.AssertionError: Create test topic : tstopic failed, 
> org.apache.kafka.common.errors.TopicExistsException: Topic 'tstopic' already 
> exists.
> Sep 01 15:53:20   at org.junit.Assert.fail(Assert.java:89)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.createTestTopic(KafkaTestEnvironmentImpl.java:226)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.createTestTopic(KafkaTestEnvironment.java:112)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaTestBase.createTestTopic(KafkaTestBase.java:212)
> Sep 01 15:53:20   at 
> org.apache.flink.streaming.connectors.kafka.KafkaITCase.testTimestamps(KafkaITCase.java:191)
> Sep 01 15:53:20   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> Sep 01 15:53:20   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> Sep 01 15:53:20   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> Sep 01 15:53:20   at java.lang.reflect.Method.invoke(Method.java:498)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> Sep 01 15:53:20   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
> Sep 01 15:53:20   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
> Sep 01 15:53:20   at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> Sep 01 15:53:20   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29545) kafka consuming stop when trigger first checkpoint

2022-10-24 Thread xiaogang zhou (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623212#comment-17623212
 ] 

xiaogang zhou commented on FLINK-29545:
---

[~pnowojski] Hi Master, I could clearly find the netty client side has quite a 
few successful netty listener call back(10 sec periodic heartbeat), but the 
netty server side could trigger the idle timeout for 30 second. so I think the 
heart beat for the data communication is also necessary... I have created a PR, 
please kindly review...

> kafka consuming stop when trigger first checkpoint
> --
>
> Key: FLINK-29545
> URL: https://issues.apache.org/jira/browse/FLINK-29545
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Runtime / Network
>Affects Versions: 1.13.3
>Reporter: xiaogang zhou
>Priority: Critical
>  Labels: pull-request-available
> Attachments: backpressure 100 busy 0.png, task acknowledge na.png, 
> task dag.png
>
>
> the task dag is like attached file. the task is started to consume from 
> earliest offset, it will stop when the first checkpoint triggers.
>  
> is it normal?, for sink is busy 0 and the second operator has 100 backpressure
>  
> and check the checkpoint summary, we can find some of the sub task is n/a.
> I tried to debug this issue and found in the 
> triggerCheckpointAsync , the 
> triggerCheckpointAsyncInMailbox took  a lot time to call
>  
>  
> looks like this has something to do with 
> logCheckpointProcessingDelay, Has any fix on this issue?
>  
>  
> can anybody help me on this issue?
>  
>  
>  
>  
> thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-web] godfreyhe commented on a diff in pull request #574: Announcement blogpost for the 1.16 release

2022-10-24 Thread GitBox


godfreyhe commented on code in PR #574:
URL: https://github.com/apache/flink-web/pull/574#discussion_r1003354068


##
_config.yml:
##
@@ -244,6 +260,10 @@ component_releases:
 
 release_archive:
 flink:
+  -
+version_short: "1.16"
+version_long: 1.16.0
+release_date: 2022-10-15

Review Comment:
   yes, I will update the value if the date is confirmed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #20267: [FLINK-28542][tests][JUnit5 Migration] FileSystemBehaviorTestSuite

2022-10-24 Thread GitBox


RyanSkraba commented on code in PR #20267:
URL: https://github.com/apache/flink/pull/20267#discussion_r1003342134


##
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemBehaviorTest.java:
##
@@ -23,26 +23,25 @@
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 
-import org.junit.Rule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Behavior tests for Flink's {@link LocalFileSystem}. */
-public class LocalFileSystemBehaviorTest extends FileSystemBehaviorTestSuite {
+class LocalFileSystemBehaviorTest extends FileSystemBehaviorTestSuite {
 
-@Rule public final TemporaryFolder tmp = new TemporaryFolder();
+@TempDir private java.nio.file.Path tmp;

Review Comment:
   See above -- I didn't change this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #20267: [FLINK-28542][tests][JUnit5 Migration] FileSystemBehaviorTestSuite

2022-10-24 Thread GitBox


RyanSkraba commented on code in PR #20267:
URL: https://github.com/apache/flink/pull/20267#discussion_r1003341622


##
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HdfsBehaviorTest.java:
##
@@ -47,19 +43,17 @@ public class HdfsBehaviorTest extends 
FileSystemBehaviorTestSuite {
 
 // 
 
-@BeforeClass
-public static void verifyOS() {
-Assume.assumeTrue(
-"HDFS cluster cannot be started on Windows without 
extensions.",
-!OperatingSystem.isWindows());
+@BeforeAll
+static void verifyOS() {
+assumeThat(OperatingSystem.isWindows())
+.describedAs("HDFS cluster cannot be started on Windows 
without extensions.")
+.isFalse();
 }
 
-@BeforeClass
-public static void createHDFS() throws Exception {
-final File baseDir = TMP.newFolder();
-
+@BeforeAll
+static void createHDFS(@TempDir java.nio.file.Path tmp) throws Exception {

Review Comment:
   I made this change because of the `getAbsolutePath`, but I'd argue that it's 
strictly not necessary or desireable to avoid fully qualified classes!  Most 
uses of `@TempDir` are on `java.nio` classes I believe!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ferenc-csaky commented on pull request #21127: [FLINK-29707][cli] Fix possible comparator violation for "flink list"

2022-10-24 Thread GitBox


ferenc-csaky commented on PR #21127:
URL: https://github.com/apache/flink/pull/21127#issuecomment-1289067927

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-25802) OverWindow in batch mode failed

2022-10-24 Thread Runkang He (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623197#comment-17623197
 ] 

Runkang He commented on FLINK-25802:


Hi, [~TsReaper] is there some update on this issue?

> OverWindow in batch mode failed
> ---
>
> Key: FLINK-25802
> URL: https://issues.apache.org/jira/browse/FLINK-25802
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Zoyo Pei
>Priority: Major
>
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream userStream = env
> .fromElements(
> Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, 
> "Alice"),
> Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"),
> Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob"))
> .returns(
> Types.ROW_NAMED(
> new String[]{"ts", "uid", "name"},
> Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
> tEnv.createTemporaryView(
> "UserTable",
> userStream,
> Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("uid", DataTypes.INT())
> .column("name", DataTypes.STRING())
> .watermark("ts", "ts - INTERVAL '1' SECOND")
> .build());
> String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable 
> " +
> "WINDOW w AS (" +
> " PARTITION BY name" +
> " ORDER BY ts" +
> " RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" +
> ")";
> tEnv.executeSql(statement).print();
>  {code}
>  
> {code:java}
> /* 1 */
> /* 2 */      public class RangeBoundComparator$38 implements 
> org.apache.flink.table.runtime.generated.RecordComparator {
> /* 3 */
> /* 4 */        private final Object[] references;
> /* 5 */        
> /* 6 */
> /* 7 */        public RangeBoundComparator$38(Object[] references) {
> /* 8 */          this.references = references;
> /* 9 */          
> /* 10 */          
> /* 11 */        }
> /* 12 */
> /* 13 */        @Override
> /* 14 */        public int compare(org.apache.flink.table.data.RowData in1, 
> org.apache.flink.table.data.RowData in2) {
> /* 15 */          
> /* 16 */                  org.apache.flink.table.data.TimestampData field$39;
> /* 17 */                  boolean isNull$39;
> /* 18 */                  org.apache.flink.table.data.TimestampData field$40;
> /* 19 */                  boolean isNull$40;
> /* 20 */                  isNull$39 = in1.isNullAt(0);
> /* 21 */                  field$39 = null;
> /* 22 */                  if (!isNull$39) {
> /* 23 */                    field$39 = in1.getTimestamp(0, 3);
> /* 24 */                  }
> /* 25 */                  isNull$40 = in2.isNullAt(0);
> /* 26 */                  field$40 = null;
> /* 27 */                  if (!isNull$40) {
> /* 28 */                    field$40 = in2.getTimestamp(0, 3);
> /* 29 */                  }
> /* 30 */                  if (isNull$39 && isNull$40) {
> /* 31 */                     return 1;
> /* 32 */                  } else if (isNull$39 || isNull$40) {
> /* 33 */                     return -1;
> /* 34 */                  } else {
> /* 35 */                     
> /* 36 */                            
> /* 37 */                            long result$41;
> /* 38 */                            boolean isNull$41;
> /* 39 */                            long result$42;
> /* 40 */                            boolean isNull$42;
> /* 41 */                            boolean isNull$43;
> /* 42 */                            long result$44;
> /* 43 */                            boolean isNull$45;
> /* 44 */                            boolean result$46;
> /* 45 */                            isNull$41 = (java.lang.Long) field$39 == 
> null;
> /* 46 */                            result$41 = -1L;
> /* 47 */                            if (!isNull$41) {
> /* 48 */                              result$41 = (java.lang.Long) field$39;
> /* 49 */                            }
> /* 50 */                            isNull$42 = (java.lang.Long) field$40 == 
> null;
> /* 51 */                            result$42 = -1L;
> /* 52 */                            if (!isNull$42) {
> /* 53 */                              result$42 = (java.lang.Long) field$40;
> /* 54 */                            }
> /* 55 */                            
> /* 56 */                            
> /* 57 */                            
> /* 58 */                            
> /* 59 */                       

[GitHub] [flink] RyanSkraba commented on a diff in pull request #20267: [FLINK-28542][tests][JUnit5 Migration] FileSystemBehaviorTestSuite

2022-10-24 Thread GitBox


RyanSkraba commented on code in PR #20267:
URL: https://github.com/apache/flink/pull/20267#discussion_r1003339435


##
flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java:
##
@@ -70,12 +60,30 @@ public class AzureFileSystemBehaviorITCase extends 
FileSystemBehaviorTestSuite {
 
 private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
 
-// Azure Blob Storage defaults to https only storage accounts. We check if 
http support has been
-// enabled on a best effort basis and test http if so.
-@Parameterized.Parameters(name = "Scheme = {0}")
-public static List parameters() throws IOException {
-boolean httpsOnly = isHttpsTrafficOnly();
-return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", 
"wasbs");
+/**
+ * Azure Blob Storage defaults to https only storage accounts, tested in 
the base class.
+ *
+ * This nested class repeats the tests with http support, but only if a 
best effort check on
+ * https support succeeds.
+ */
+static class HttpSupportAzureFileSystemBehaviorITCase extends 
AzureFileSystemBehaviorITCase {

Review Comment:
   I don't think this makes a significant difference to warrant the change!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] RyanSkraba commented on a diff in pull request #20267: [FLINK-28542][tests][JUnit5 Migration] FileSystemBehaviorTestSuite

2022-10-24 Thread GitBox


RyanSkraba commented on code in PR #20267:
URL: https://github.com/apache/flink/pull/20267#discussion_r1003338793


##
flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java:
##
@@ -85,9 +93,9 @@ private static boolean isHttpsTrafficOnly() throws 
IOException {
 return true;
 }
 
-Assume.assumeTrue(
-"Azure storage account not configured, skipping test...",
-!StringUtils.isNullOrWhitespaceOnly(ACCOUNT));
+assumeThat(ACCOUNT)
+.describedAs("Azure storage account not configured, skipping 
test...")
+.isNotBlank();

Review Comment:
   I rewrote the logic here so that all of the `assumeThat` are more explicit.  
I'm pretty sure it's equivalent, but I'd appreciate a bit of extra review on 
this change!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >