hlteoh37 commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1193430853


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * <p>Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * <p>Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.
+ *
+ * @param <T> the data type emitted by the source
+ */
+@Experimental
+public class KinesisStreamsSource<T>
+        implements Source<T, KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsSource(
+            String streamArn,
+            Properties consumerConfig,

Review Comment:
   Yes, that was the intention, but we needed to refactor aws-base to also 
handle configuration object rather than properties object. So thats why I 
created a followup here
   https://issues.apache.org/jira/browse/FLINK-31990



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to