[
https://issues.apache.org/jira/browse/FLINK-35697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862708#comment-17862708
]
Ahmed Hamdy commented on FLINK-35697:
-------------------------------------
[~m.orazow] Thanks for picking it up, [~jingge] could you please assign
Muhammet the ticket?
> Release Testing: Verify FLIP-451 Introduce timeout configuration to AsyncSink
> -----------------------------------------------------------------------------
>
> Key: FLINK-35697
> URL: https://issues.apache.org/jira/browse/FLINK-35697
> Project: Flink
> Issue Type: Sub-task
> Components: Connectors / Common
> Reporter: Ahmed Hamdy
> Priority: Blocker
> Fix For: 1.20.0
>
>
> h2. Description
> In FLIP-451 we added Timeout configuration to {{AsyncSinkWriter}}, with
> default value of 10 minutes and default failOnTimeout to false.
> We need to test the new feature on different levels
> - Functional Testing
> - Performance Testing
> - Regression Testing
> h2. Common Utils
> The feature introduced affects an abstract {{AsyncSinkWriter}} class. we need
> to use an implementation sink for our tests, Any implementation where we can
> track delivery of elements is accepted in our tests, an example is:
> {code}
> class DiscardingElementWriter extends AsyncSinkWriter<T, String> {
> SeparateThreadExecutor executor =
> new SeparateThreadExecutor(r -> new Thread(r,
> "DiscardingElementWriter"));
> public DiscardingElementWriter(
> Sink.InitContext context,
> AsyncSinkWriterConfiguration configuration,
> Collection<BufferedRequestState<String>>
> bufferedRequestStates) {
> super(
> (element, context1) -> element.toString(),
> context,
> configuration,
> bufferedRequestStates);
> }
> @Override
> protected long getSizeInBytes(String requestEntry) {
> return requestEntry.length();
> }
> @Override
> protected void submitRequestEntries(
> List<String> requestEntries, ResultHandler<String>
> resultHandler) {
> executor.execute(
> () -> {
> long delayMillis = new Random().nextInt(5000);
> try {
> Thread.sleep(delayMillis);
> } catch (InterruptedException ignored) {
> }
> for (String entry : requestEntries) {
> LOG.info("Discarding {} after {} ms", entry,
> delayMillis);
> }
> resultHandler.complete();
> });
> }
> }
> {code}
> We will also need a simple Flink Job that writes data using the sink
> {code}
> final StreamExecutionEnvironment env = StreamExecutionEnvironment
> .getExecutionEnvironment();
> env.setParallelism(1);
> env.fromSequence(0, 100)
> .map(Object::toString)
> .sinkTo(new DiscardingTestAsyncSink<>());
> {code}
> We can use least values for batch size and inflight requests to increase
> number of requests that are subject to timeout
> {code}
> public class DiscardingTestAsyncSink<T> extends AsyncSinkBase<T, String> {
> private static final Logger LOG =
> LoggerFactory.getLogger(DiscardingTestAsyncSink.class);
> public DiscardingTestAsyncSink(long requestTimeoutMS, boolean
> failOnTimeout) {
> super(
> (element, context) -> element.toString(),
> 1, // maxBatchSize
> 1, // maxInflightRequests
> 10, // maxBufferedRequests
> 1000L, // maxBatchsize
> 100, // MaxTimeInBuffer
> 500L, // maxRecordSize
> requestTimeoutMS,
> failOnTimeout);
> }
> @Override
> public SinkWriter<T> createWriter(WriterInitContext context) throws
> IOException {
> return new DiscardingElementWriter(
> new InitContextWrapper(context),
> AsyncSinkWriterConfiguration.builder()
> .setMaxBatchSize(this.getMaxBatchSize())
> .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
> .setMaxInFlightRequests(this.getMaxInFlightRequests())
> .setMaxBufferedRequests(this.getMaxBufferedRequests())
> .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
>
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
> .setFailOnTimeout(this.getFailOnTimeout())
> .setRequestTimeoutMS(this.getRequestTimeoutMS())
> .build(),
> Collections.emptyList());
> }
> @Override
> public StatefulSinkWriter<T, BufferedRequestState<String>> restoreWriter(
> WriterInitContext context,
> Collection<BufferedRequestState<String>> recoveredState)
> throws IOException {
> return new DiscardingElementWriter(
> new InitContextWrapper(context),
> AsyncSinkWriterConfiguration.builder()
> .setMaxBatchSize(this.getMaxBatchSize())
> .setMaxBatchSizeInBytes(this.getMaxBatchSizeInBytes())
> .setMaxInFlightRequests(this.getMaxInFlightRequests())
> .setMaxBufferedRequests(this.getMaxBufferedRequests())
> .setMaxTimeInBufferMS(this.getMaxTimeInBufferMS())
>
> .setMaxRecordSizeInBytes(this.getMaxRecordSizeInBytes())
> .setFailOnTimeout(this.getFailOnTimeout())
> .setRequestTimeoutMS(this.getRequestTimeoutMS())
> .build(),
> recoveredState);
> }
> {code}
> h2. Functional tests
> These are common tests to verify the new feature works correctly withing
> Flink jobs
> h3. Test Timeout Requests are retried ensuring at least once semantics
> h4. Steps
> - pull and compile {{release-1.20}} branch
> - start flink cluster from flink-dist {{./start-cluster.sh}}
> - Configure the requestTimeout value in your job to sample number of timed
> out requests.
> - compile and package your test job
> - open Flink Dashboard, upload the job jar and submit the job
> - Verify from the logs that all elements are delivered and all elemements
> delivered with delay more than the configured timeout are resubmitted.
> *hint 1* : It is advised to use timeout closed to the simulated delay in the
> sink so that the retried requests are not too much to track
> *hint 2*: It is also advised to use the debugger to check timed out requests
> on the fly.
> h3. Test Fail on timeout fails Job
> h4. Steps
> Follow same steps of setting up job and cluster but use enable the
> {{failOnTimeout}} flag
> - Verify that the jobs fails with {{TimeoutException}} visible in Job logs
> h3. Test With Checkpoints enabled
> h4. Steps
> Follow same steps of setting up job and cluster with checkpoints enabled
> - Verify checkpoints are taken successfully
> h2. Performance testing
> We want to verify the introduced feature doesn't affect performance of the
> sink, specifically it doesn't introduce unnecessary backpressure
> h4. Steps
> Execute same steps for setting up job and cluster but use a datagen source in
> the test job to control throughput and use default values for requestTimeOut
> = 10 minutes
> - Run the job till it is in stable state
> - Verify the Sink doesn't introduce backpressure to the job.
> h2. Regression Testing
> We need to verify the Sink doesn't cause regression in existing implementers,
> for example we want to make sure there is no significant load of duplicate
> data due to timeouts on default values using an existing sink.
> We have the following implementers under the community support
> {{Kinesis, Firehose, DynamoDb, ElasticSearch}}
> It is advisable to test with all of them
> h4. Steps
> - Run a simple job that sinks data from Datagen source to the used sink
> - Benchmark the throughput to the Sink destination
> - clone the sink connector repo as in
> https://github.com/apache/flink-connector-aws
> - update the Flink version in the repo to {{1.20-SNAPSHOT}}
> - Rerun the job and compare the throughput metrics with the benchmark.
> - Verify there is no regression between the 2 cases.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)