[ 
https://issues.apache.org/jira/browse/FLINK-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088020#comment-16088020
 ] 

Bowen Li commented on FLINK-6365:
---------------------------------

We ran into this issue, too. 100 is too small for SHARD_GETRECORDS_MAX, and we 
got exceptions like:


{code:java}
2017-07-14 19:53:02,995 INFO  
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient  - Unable 
to execute HTTP request: Broken pipe (Write failed)
java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
        at sun.security.ssl.OutputRecord.writeBuffer(OutputRecord.java:431)
        at sun.security.ssl.OutputRecord.write(OutputRecord.java:417)
        at 
sun.security.ssl.SSLSocketImpl.writeRecordInternal(SSLSocketImpl.java:886)
        at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:857)
        at sun.security.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:727)
        at 
sun.security.ssl.Handshaker.sendChangeCipherSpec(Handshaker.java:1124)
        at 
sun.security.ssl.ClientHandshaker.sendChangeCipherAndFinish(ClientHandshaker.java:1216)
        at 
sun.security.ssl.ClientHandshaker.serverHelloDone(ClientHandshaker.java:1128)
        at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:348)
        at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
        at sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
        at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
        at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
        at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
        at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
        at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:553)
        at 
org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:412)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:134)
        at 
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:179)
        at 
org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:328)
        at 
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:612)
        at 
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:447)
        at 
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
        at 
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:837)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1940)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1910)
        at 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:936)
        at 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:199)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:292)
        at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:200)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
{code}


I'll take this ticket, and propose a default value (5000 or 10000) based on my 
use case with Flink and Kinesis.

> Adapt default values of the Kinesis connector
> ---------------------------------------------
>
>                 Key: FLINK-6365
>                 URL: https://issues.apache.org/jira/browse/FLINK-6365
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.2.0
>            Reporter: Steffen Hausmann
>            Assignee: Bowen Li
>            Priority: Minor
>
> As discussed in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kinesis-connector-SHARD-GETRECORDS-MAX-default-value-td12332.html,
>  it seems reasonable to change the default values of the Kinesis connector to 
> follow KCL’s default settings. I suggest to adapt at least the values for 
> SHARD_GETRECORDS_MAX and SHARD_GETRECORDS_INTERVAL_MILLIS. 
> As a Kinesis shard is currently limited to 5 get operations per second, you 
> can observe high ReadProvisionedThroughputExceeded rates with the current 
> default value for SHARD_GETRECORDS_INTERVAL_MILLIS of 0; it seem reasonable 
> to increase it to 200. As it's described in the email thread, it seems 
> furthermore desirable to increase the default value for SHARD_GETRECORDS_MAX 
> to 10000.
> The values that are used by the KCL can be found here: 
> https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
> Thanks for looking into this!
> Steffen



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to