dannycranmer commented on code in PR #52:
URL: 
https://github.com/apache/flink-connector-aws/pull/52#discussion_r1095976283


##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -352,6 +352,14 @@ Connector Options
       <td>Integer</td>
       <td>Maximum number of allowed concurrent requests for the EFO client. 
See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
     </tr>
+    <tr>
+      <td><h5>scan.stream.shardassigner</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">DEFAULT</td>
+      <td>String</td>
+      <td>The shard assigner used to map shards to Flink sub tasks 
(DEFAULT|UNIFORM).</td></td>
+    </tr>

Review Comment:
   Can you also update the CN docs, they should be English until they are 
translated. Unless you speak Chinese?!



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java:
##########
@@ -242,6 +244,8 @@ public FlinkKinesisConsumer(
                     "Flink Kinesis Consumer is going to read the following 
streams: {}",
                     sb.toString());
         }
+
+        this.shardAssigner = createShardAssigner();

Review Comment:
   While this works, I have a couple of recommendations I would like to hear 
your thoughts on:
   1. We should move this logic into the Table API wrapper. Unless you see 
benefit in this being configurable via properties for the DS connector. I see 
value in them being consistent.
   2. Since you are using an enum this implementation violates the `O` in 
`SOLID` since it is closed for extension. Users can already set custom shard 
assigners in the datastream API but this will not allow custom assigners in 
Table API. What are you thoughts on using service loader to allow shard 
assigners to be loaded dynamically? Similar to how table connectors are loaded 
today  



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java:
##########
@@ -549,6 +553,21 @@ protected KinesisDataFetcher<T> createFetcher(
                 watermarkTracker);
     }
 
+    protected KinesisShardAssigner createShardAssigner() {

Review Comment:
   This method can/should be private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to