dannycranmer commented on code in PR #52:
URL:
https://github.com/apache/flink-connector-aws/pull/52#discussion_r1097615238
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java:
##########
@@ -326,7 +326,7 @@ public RecordQueue<RecordWrapper<T>> getQueue(int
producerIndex) {
return queues.computeIfAbsent(
producerIndex,
(key) ->
- new RecordQueue<RecordWrapper<T>>() {
+ new RecordEmitter.RecordQueue<RecordWrapper<T>>() {
Review Comment:
nit: This is unnecessary
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
Review Comment:
Missing Apache copyright header
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+
+/** Factory utility to dynamically load a {@link KinesisShardAssigner}
instance. */
+@PublicEvolving
+public interface KinesisDynamicShardAssignerFactory {
+
+ /**
+ * Returns the identifier of shard assigner so it can uniquely identified
and loaded.
Review Comment:
```suggestion
* Returns the identifier of shard assigner so it can be uniquely
identified and loaded.
```
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/UniformShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+import org.apache.flink.streaming.connectors.kinesis.util.UniformShardAssigner;
+
+/**
+ * A {@link KinesisDynamicShardAssignerFactory} that loads the {@link
UniformShardAssigner} shard
+ * assigner.
+ */
+public class UniformShardAssignerFactory implements
KinesisDynamicShardAssignerFactory {
Review Comment:
Missing compatibility annotation, again suggest `@Internal` for this
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/KinesisDynamicSource.java:
##########
@@ -219,4 +245,30 @@ public int hashCode() {
consumerProperties,
decodingFormat);
}
+
+ private KinesisShardAssigner getShardAssigner(String
shardAssignerIdentifier) {
+ try {
+ ServiceLoader<KinesisDynamicShardAssignerFactory> loader =
+
ServiceLoader.load(KinesisDynamicShardAssignerFactory.class);
+ for (KinesisDynamicShardAssignerFactory factory : loader) {
+ if
(factory.shardAssignerIdentifer().equals(shardAssignerIdentifier)) {
+ return factory.getShardAssigner();
+ }
+ }
+ } catch (Exception e) {
+ String msg =
+ String.format(
+ "Error while attempting to load shard assigner
with identifier: '{}'",
+ shardAssignerIdentifier);
+ LOG.error(msg, e);
+
+ return null;
Review Comment:
nit:
```
LOG.error("Error while attempting to load shard assigner with identifier:
'{}'", shardAssignerIdentifier, e);
```
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/DefaultShardAssignerFactory.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
+
+import org.apache.flink.streaming.connectors.kinesis.KinesisShardAssigner;
+
+/** A {@link KinesisDynamicShardAssignerFactory} that loads the default shard
assigner. */
+public class DefaultShardAssignerFactory implements
KinesisDynamicShardAssignerFactory {
Review Comment:
Missing Flink compatibility annotation. I would suggest this is `@Internal`
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/DefaultShardAssignerFactory.java:
##########
@@ -0,0 +1,22 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
Review Comment:
Missing Apache copyright header
##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/table/UniformShardAssignerFactory.java:
##########
@@ -0,0 +1,23 @@
+package org.apache.flink.streaming.connectors.kinesis.table;
Review Comment:
Missing Apache copyright header
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]