dannycranmer commented on code in PR #52:
URL: 
https://github.com/apache/flink-connector-aws/pull/52#discussion_r1097615238


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java:
##########
@@ -326,7 +326,7 @@ public RecordQueue<RecordWrapper<T>> getQueue(int 
producerIndex) {
             return queues.computeIfAbsent(
                     producerIndex,
                     (key) ->
-                            new RecordQueue<RecordWrapper<T>>() {
+                            new RecordEmitter.RecordQueue<RecordWrapper<T>>() {

Review Comment:
   nit: This is unnecessary 



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;

Review Comment:
   Missing Apache copyright header



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+
+/** Factory utility to dynamically load a {@link KinesisShardAssigner} 
instance. */
+@PublicEvolving
+public interface KinesisDynamicShardAssignerFactory {
+
+    /**
+     * Returns the identifier of shard assigner so it can uniquely identified 
and loaded.

Review Comment:
   ```suggestion
        * Returns the identifier of shard assigner so it can be uniquely 
identified and loaded.
   ```



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/UniformShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner;
+
+/**
+ * A {@link KinesisDynamicShardAssignerFactory} that loads the {@link 
UniformShardAssigner} shard
+ * assigner.
+ */
+public class UniformShardAssignerFactory implements 
KinesisDynamicShardAssignerFactory {

Review Comment:
   Missing compatibility annotation, again suggest `@Internal` for this



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicSource.java:
##########
@@ -219,4 +245,30 @@ public int hashCode() {
                 consumerProperties,
                 decodingFormat);
     }
+
+    private KinesisShardAssigner getShardAssigner(String 
shardAssignerIdentifier) {
+        try {
+            ServiceLoader<KinesisDynamicShardAssignerFactory> loader =
+                    
ServiceLoader.load(KinesisDynamicShardAssignerFactory.class);
+            for (KinesisDynamicShardAssignerFactory factory : loader) {
+                if 
(factory.shardAssignerIdentifer().equals(shardAssignerIdentifier)) {
+                    return factory.getShardAssigner();
+                }
+            }
+        } catch (Exception e) {
+            String msg =
+                    String.format(
+                            "Error while attempting to load shard assigner 
with identifier: '{}'",
+                            shardAssignerIdentifier);
+            LOG.error(msg, e);
+
+            return null;

Review Comment:
   nit: 
   
   ```
   LOG.error("Error while attempting to load shard assigner with identifier: 
'{}'", shardAssignerIdentifier, e);
   ```



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/DefaultShardAssignerFactory.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+
+/** A {@link KinesisDynamicShardAssignerFactory} that loads the default shard 
assigner. */
+public class DefaultShardAssignerFactory implements 
KinesisDynamicShardAssignerFactory {

Review Comment:
   Missing Flink compatibility annotation. I would suggest this is `@Internal` 



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/DefaultShardAssignerFactory.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.flink.streaming.connectors.kinesis.table;

Review Comment:
   Missing Apache copyright header



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/UniformShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;

Review Comment:
   Missing Apache copyright header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to