http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml new file mode 100644 index 0000000..9e0c7e8 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -0,0 +1,164 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.2-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-kinesis_2.10</artifactId> + <name>flink-connector-kinesis</name> + <properties> + <aws.sdk.version>1.10.71</aws.sdk.version> + <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version> + <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version> + </properties> + + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_2.10</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <!-- Note: + The below dependencies are licenced under the Amazon Software License. + Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason. + --> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-kinesis</artifactId> + <version>${aws.sdk.version}</version> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-kinesis-producer</artifactId> + <version>${aws.kinesis-kpl.version}</version> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>amazon-kinesis-client</artifactId> + <version>${aws.kinesis-kcl.version}</version> + <!-- + We're excluding the below from the KCL since we'll only be using the + com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies. + --> + <exclusions> + <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-dynamodb</artifactId> + </exclusion> + <exclusion> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-cloudwatch</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <promoteTransitiveDependencies>true</promoteTransitiveDependencies> + <artifactSet combine.children="append"> + <includes> + <include>com.amazonaws:*</include> + <include>com.google.protobuf:*</include> + </includes> + </artifactSet> + <relocations combine.children="override"> + <!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE --> + <relocation> + <pattern>org.objectweb.asm</pattern> + <shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.protobuf</pattern> + <shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern> + </relocation> + <relocation> + <pattern>com.amazonaws</pattern> + <shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java new file mode 100644 index 0000000..a62dc10 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis + * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is + * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will + * change as shards are closed and created by Kinesis. + * + * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis + * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for + * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial + * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p> + * + * @param <T> the type of data emitted + */ +public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> + implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> { + + private static final long serialVersionUID = 4724006128720664870L; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); + + // ------------------------------------------------------------------------ + // Consumer properties + // ------------------------------------------------------------------------ + + /** The names of the Kinesis streams that we will be consuming from */ + private final List<String> streams; + + /** Properties to parametrize settings such as AWS service region, initial position in stream, + * shard list retrieval behaviours, etc */ + private final Properties configProps; + + /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */ + private final KinesisDeserializationSchema<T> deserializer; + + // ------------------------------------------------------------------------ + // Runtime state + // ------------------------------------------------------------------------ + + /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */ + private transient KinesisDataFetcher<T> fetcher; + + /** The sequence numbers in the last state snapshot of this subtask */ + private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot; + + /** The sequence numbers to restore to upon restore from failure */ + private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore; + + private volatile boolean running = true; + + + // ------------------------------------------------------------------------ + // Constructors + // ------------------------------------------------------------------------ + + /** + * Creates a new Flink Kinesis Consumer. + * + * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming + * from are configured with a {@link Properties} instance.</p> + * + * @param stream + * The single AWS Kinesis stream to read from. + * @param deserializer + * The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). + * @param configProps + * The properties used to configure AWS credentials, AWS region, and initial starting position. + */ + public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) { + this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps); + } + + /** + * Creates a new Flink Kinesis Consumer. + * + * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming + * from are configured with a {@link Properties} instance.</p> + * + * @param stream + * The single AWS Kinesis stream to read from. + * @param deserializer + * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects. + * @param configProps + * The properties used to configure AWS credentials, AWS region, and initial starting position. + */ + public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) { + this(Collections.singletonList(stream), deserializer, configProps); + } + + /** + * Creates a new Flink Kinesis Consumer. + * + * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming + * from are configured with a {@link Properties} instance.</p> + * + * @param streams + * The AWS Kinesis streams to read from. + * @param deserializer + * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects. + * @param configProps + * The properties used to configure AWS credentials, AWS region, and initial starting position. + */ + public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) { + checkNotNull(streams, "streams can not be null"); + checkArgument(streams.size() != 0, "must be consuming at least 1 stream"); + checkArgument(!streams.contains(""), "stream names cannot be empty Strings"); + this.streams = streams; + + this.configProps = checkNotNull(configProps, "configProps can not be null"); + + // check the configuration properties for any conflicting settings + KinesisConfigUtil.validateConsumerConfiguration(this.configProps); + + this.deserializer = checkNotNull(deserializer, "deserializer can not be null"); + + if (LOG.isInfoEnabled()) { + StringBuilder sb = new StringBuilder(); + for (String stream : streams) { + sb.append(stream).append(", "); + } + LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString()); + } + } + + // ------------------------------------------------------------------------ + // Source life cycle + // ------------------------------------------------------------------------ + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + // restore to the last known sequence numbers from the latest complete snapshot + if (sequenceNumsToRestore != null) { + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state", + getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString()); + } + + // initialize sequence numbers with restored state + lastStateSnapshot = sequenceNumsToRestore; + } else { + // start fresh with empty sequence numbers if there are no snapshots to restore from. + lastStateSnapshot = new HashMap<>(); + } + } + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + + // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have + // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks + // can potentially have new shards to subscribe to later on + fetcher = new KinesisDataFetcher<>( + streams, sourceContext, getRuntimeContext(), configProps, deserializer); + + boolean isRestoringFromFailure = (sequenceNumsToRestore != null); + fetcher.setIsRestoringFromFailure(isRestoringFromFailure); + + // if we are restoring from a checkpoint, we iterate over the restored + // state and accordingly seed the fetcher with subscribed shards states + if (isRestoringFromFailure) { + for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + fetcher.advanceLastDiscoveredShardOfStream( + restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} is seeding the fetcher with restored shard {}," + + " starting state set to the restored sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue()); + } + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState(restored.getKey(), restored.getValue())); + } + } + + // check that we are running before starting the fetcher + if (!running) { + return; + } + + // start the fetcher loop. The fetcher will stop running only when cancel() or + // close() is called, or an error is thrown by threads created by the fetcher + fetcher.runFetcher(); + + // check that the fetcher has terminated before fully closing + fetcher.awaitTermination(); + sourceContext.close(); + } + + @Override + public void cancel() { + running = false; + + KinesisDataFetcher fetcher = this.fetcher; + this.fetcher = null; + + // this method might be called before the subtask actually starts running, + // so we must check if the fetcher is actually created + if (fetcher != null) { + try { + // interrupt the fetcher of any work + fetcher.shutdownFetcher(); + fetcher.awaitTermination(); + } catch (Exception e) { + LOG.warn("Error while closing Kinesis data fetcher", e); + } + } + } + + @Override + public void close() throws Exception { + cancel(); + super.close(); + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + // ------------------------------------------------------------------------ + // State Snapshot & Restore + // ------------------------------------------------------------------------ + + @Override + public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + if (lastStateSnapshot == null) { + LOG.debug("snapshotState() requested on not yet opened source; returning null."); + return null; + } + + if (fetcher == null) { + LOG.debug("snapshotState() requested on not yet running source; returning null."); + return null; + } + + if (!running) { + LOG.debug("snapshotState() called on closed source; returning null."); + return null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + } + + return lastStateSnapshot; + } + + @Override + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { + sequenceNumsToRestore = restoredState; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java new file mode 100644 index 0000000..579bd6b --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; +import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.PropertiesUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param <OUT> Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /** Properties to parametrize settings such as AWS service region, access key etc. */ + private final Properties configProps; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema<OUT> schema; + + /* Optional custom partitioner */ + private KinesisPartitioner<OUT> customPartitioner = null; + + + // --------------------------- Runtime fields --------------------------- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback<UserRecordResult> callback; + + /* Field for async exception */ + private transient volatile Throwable thrownException; + + + // --------------------------- Initialization and configuration --------------------------- + + + /** + * Create a new FlinkKinesisProducer. + * This is a constructor supporting Flink's {@see SerializationSchema}. + * + * @param schema Serialization schema for the data type + * @param configProps The properties used to configure AWS credentials and AWS region + */ + public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) { + + // create a simple wrapper for the serialization schema + this(new KinesisSerializationSchema<OUT>() { + @Override + public ByteBuffer serialize(OUT element) { + // wrap into ByteBuffer + return ByteBuffer.wrap(schema.serialize(element)); + } + // use default stream and hash key + @Override + public String getTargetStream(OUT element) { + return null; + } + }, configProps); + } + + /** + * Create a new FlinkKinesisProducer. + * This is a constructor supporting {@see KinesisSerializationSchema}. + * + * @param schema Kinesis serialization schema for the data type + * @param configProps The properties used to configure AWS credentials and AWS region + */ + public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) { + this.configProps = checkNotNull(configProps, "configProps can not be null"); + + // check the configuration properties for any conflicting settings + KinesisConfigUtil.validateProducerConfiguration(this.configProps); + + ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); + this.schema = schema; + } + + /** + * If set to true, the producer will immediately fail with an exception on any error. + * Otherwise, the errors are logged and the producer goes on. + * + * @param failOnError Error behavior flag + */ + public void setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + } + + /** + * Set a default stream name. + * @param defaultStream Name of the default Kinesis stream + */ + public void setDefaultStream(String defaultStream) { + this.defaultStream = defaultStream; + } + + /** + * Set default partition id + * @param defaultPartition Name of the default partition + */ + public void setDefaultPartition(String defaultPartition) { + this.defaultPartition = defaultPartition; + } + + public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) { + Objects.requireNonNull(partitioner); + ClosureCleaner.ensureSerializable(partitioner); + this.customPartitioner = partitioner; + } + + + // --------------------------- Lifecycle methods --------------------------- + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); + + producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); + producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); + if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { + producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); + } + if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { + producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, + ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); + } + + producer = new KinesisProducer(producerConfig); + callback = new FutureCallback<UserRecordResult>() { + @Override + public void onSuccess(UserRecordResult result) { + if (!result.isSuccessful()) { + if(failOnError) { + thrownException = new RuntimeException("Record was not sent successful"); + } else { + LOG.warn("Record was not sent successful"); + } + } + } + + @Override + public void onFailure(Throwable t) { + if (failOnError) { + thrownException = t; + } else { + LOG.warn("An exception occurred while processing a record", t); + } + } + }; + + if (this.customPartitioner != null) { + this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + + LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); + } + + @Override + public void invoke(OUT value) throws Exception { + if (this.producer == null) { + throw new RuntimeException("Kinesis producer has been closed"); + } + if (thrownException != null) { + String errorMessages = ""; + if (thrownException instanceof UserRecordFailedException) { + List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); + for (Attempt attempt: attempts) { + if (attempt.getErrorMessage() != null) { + errorMessages += attempt.getErrorMessage() +"\n"; + } + } + } + if (failOnError) { + throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); + } else { + LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); + thrownException = null; // reset + } + } + + String stream = defaultStream; + String partition = defaultPartition; + + ByteBuffer serialized = schema.serialize(value); + + // maybe set custom stream + String customStream = schema.getTargetStream(value); + if (customStream != null) { + stream = customStream; + } + + String explicitHashkey = null; + // maybe set custom partition + if (customPartitioner != null) { + partition = customPartitioner.getPartitionId(value); + explicitHashkey = customPartitioner.getExplicitHashKey(value); + } + + if (stream == null) { + if (failOnError) { + throw new RuntimeException("No target stream set"); + } else { + LOG.warn("No target stream set. Skipping record"); + return; + } + } + + ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized); + Futures.addCallback(cb, callback); + } + + @Override + public void close() throws Exception { + LOG.info("Closing producer"); + super.close(); + KinesisProducer kp = this.producer; + this.producer = null; + if (kp != null) { + LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount()); + // try to flush all outstanding records + while (kp.getOutstandingRecordsCount() > 0) { + kp.flush(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + LOG.warn("Flushing was interrupted."); + // stop the blocking flushing and destroy producer immediately + break; + } + } + LOG.info("Flushing done. Destroying producer instance."); + kp.destroy(); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java new file mode 100644 index 0000000..bd23abe --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import java.io.Serializable; + +public abstract class KinesisPartitioner<T> implements Serializable { + + /** + * Return a partition id based on the input + * @param element Element to partition + * @return A string representing the partition id + */ + public abstract String getPartitionId(T element); + + /** + * Optional method for setting an explicit hash key + * @param element Element to get the hash key for + * @return the hash key for the element + */ + public String getExplicitHashKey(T element) { + return null; + } + + /** + * Optional initializer. + * + * @param indexOfThisSubtask Index of this partitioner instance + * @param numberOfParallelSubtasks Total number of parallel instances + */ + public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) { + // + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java new file mode 100644 index 0000000..01d4f00 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.config; + +import com.amazonaws.auth.AWSCredentialsProvider; + +/** + * Configuration keys for AWS service usage + */ +public class AWSConfigConstants { + + /** + * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis. + * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used. + */ + public enum CredentialProvider { + + /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */ + ENV_VAR, + + /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */ + SYS_PROP, + + /** Use a AWS credentials profile file to create the AWS credentials */ + PROFILE, + + /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */ + BASIC, + + /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/ + AUTO, + } + + /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ + public static final String AWS_REGION = "aws.region"; + + /** The AWS access key ID to use when setting credentials provider type to BASIC */ + public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid"; + + /** The AWS secret key to use when setting credentials provider type to BASIC */ + public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey"; + + /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ + public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; + + /** Optional configuration for profile path if credential provider type is set to be PROFILE */ + public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path"; + + /** Optional configuration for profile name if credential provider type is set to be PROFILE */ + public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name"; + + /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */ + public static final String AWS_ENDPOINT = "aws.endpoint"; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java new file mode 100644 index 0000000..76c20ed --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.config; + +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; + +/** + * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} + */ +public class ConsumerConfigConstants extends AWSConfigConstants { + + /** + * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used + * when the consumer tasks retrieve the first shard iterator for each Kinesis shard. + */ + public enum InitialPosition { + + /** Start reading from the earliest possible record in the stream (excluding expired data records) */ + TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), + + /** Start reading from the latest incoming record */ + LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); + + private SentinelSequenceNumber sentinelSequenceNumber; + + InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { + this.sentinelSequenceNumber = sentinelSequenceNumber; + } + + public SentinelSequenceNumber toSentinelSequenceNumber() { + return this.sentinelSequenceNumber; + } + } + + /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ + public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; + + /** The base backoff time between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; + + /** The maximum backoff time between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max"; + + /** The power constant for exponential backoff between each describeStream attempt */ + public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; + + /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */ + public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount"; + + /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries"; + + /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base"; + + /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max"; + + /** The power constant for exponential backoff between each getRecords attempt */ + public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst"; + + /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */ + public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis"; + + /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries"; + + /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base"; + + /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ + public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max"; + + /** The power constant for exponential backoff between each getShardIterator attempt */ + public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst"; + + /** The interval between each attempt to discover new shards */ + public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; + + // ------------------------------------------------------------------------ + // Default values for consumer configuration + // ------------------------------------------------------------------------ + + public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; + + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; + + public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100; + + public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L; + + public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L; + + public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L; + + public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; + + /** + * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured + * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators. + */ + public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java new file mode 100644 index 0000000..1edddfc --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.config; + +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; + +/** + * Optional producer specific configuration keys for {@link FlinkKinesisProducer} + */ +public class ProducerConfigConstants extends AWSConfigConstants { + + /** Maximum number of items to pack into an PutRecords request. **/ + public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; + + /** Maximum number of items to pack into an aggregated record. **/ + public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java new file mode 100644 index 0000000..55668c6 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.util.Properties; + +/** + * This is an example on how to consume data from Kinesis + */ +public class ConsumeFromKinesis { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey")); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey")); + + DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>( + "flink-test", + new SimpleStringSchema(), + kinesisConsumerConfig)); + + kinesis.print(); + + see.execute(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java new file mode 100644 index 0000000..d178137 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis.examples; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import java.util.Properties; + +/** + * This is an example on how to produce data into Kinesis + */ +public class ProduceIntoKinesis { + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + see.setParallelism(1); + + DataStream<String> simpleStringStream = see.addSource(new EventsGenerator()); + + Properties kinesisProducerConfig = new Properties(); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); + kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); + + FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( + new SimpleStringSchema(), kinesisProducerConfig); + + kinesis.setFailOnError(true); + kinesis.setDefaultStream("flink-test"); + kinesis.setDefaultPartition("0"); + + simpleStringStream.addSink(kinesis); + + see.execute(); + } + + public static class EventsGenerator implements SourceFunction<String> { + private boolean running = true; + + @Override + public void run(SourceContext<String> ctx) throws Exception { + long seq = 0; + while(running) { + Thread.sleep(10); + ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); + } + } + + @Override + public void cancel() { + running = false; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java new file mode 100644 index 0000000..a06fdca --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -0,0 +1,679 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.internals; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; +import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; +import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; +import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A KinesisDataFetcher is responsible for fetching data from multiple Kinesis shards. Each parallel subtask instantiates + * and runs a single fetcher throughout the subtask's lifetime. The fetcher accomplishes the following: + * <ul> + * <li>1. continuously poll Kinesis to discover shards that the subtask should subscribe to. The subscribed subset + * of shards, including future new shards, is non-overlapping across subtasks (no two subtasks will be + * subscribed to the same shard) and determinate across subtask restores (the subtask will always subscribe + * to the same subset of shards even after restoring)</li> + * <li>2. decide where in each discovered shard should the fetcher start subscribing to</li> + * <li>3. subscribe to shards by creating a single thread for each shard</li> + * </ul> + * + * <p>The fetcher manages two states: 1) last seen shard ids of each subscribed stream (used for continuous shard discovery), + * and 2) last processed sequence numbers of each subscribed shard. Since operations on the second state will be performed + * by multiple threads, these operations should only be done using the handler methods provided in this class. + */ +public class KinesisDataFetcher<T> { + + private static final Logger LOG = LoggerFactory.getLogger(KinesisDataFetcher.class); + + // ------------------------------------------------------------------------ + // Consumer-wide settings + // ------------------------------------------------------------------------ + + /** Configuration properties for the Flink Kinesis Consumer */ + private final Properties configProps; + + /** The list of Kinesis streams that the consumer is subscribing to */ + private final List<String> streams; + + /** + * The deserialization schema we will be using to convert Kinesis records to Flink objects. + * Note that since this might not be thread-safe, {@link ShardConsumer}s using this must + * clone a copy using {@link KinesisDataFetcher#getClonedDeserializationSchema()}. + */ + private final KinesisDeserializationSchema<T> deserializationSchema; + + // ------------------------------------------------------------------------ + // Subtask-specific settings + // ------------------------------------------------------------------------ + + /** Runtime context of the subtask that this fetcher was created in */ + private final RuntimeContext runtimeContext; + + private final int totalNumberOfConsumerSubtasks; + + private final int indexOfThisConsumerSubtask; + + /** + * This flag should be set by {@link FlinkKinesisConsumer} using + * {@link KinesisDataFetcher#setIsRestoringFromFailure(boolean)} + */ + private boolean isRestoredFromFailure; + + // ------------------------------------------------------------------------ + // Executor services to run created threads + // ------------------------------------------------------------------------ + + /** Executor service to run {@link ShardConsumer}s to consume Kinesis shards */ + private final ExecutorService shardConsumersExecutor; + + // ------------------------------------------------------------------------ + // Managed state, accessed and updated across multiple threads + // ------------------------------------------------------------------------ + + /** The last discovered shard ids of each subscribed stream, updated as the fetcher discovers new shards in. + * Note: this state will be updated if new shards are found when {@link KinesisDataFetcher#discoverNewShardsToSubscribe()} is called. + */ + private final Map<String, String> subscribedStreamsToLastDiscoveredShardIds; + + /** + * The shards, along with their last processed sequence numbers, that this fetcher is subscribed to. The fetcher + * will add new subscribed shard states to this list as it discovers new shards. {@link ShardConsumer} threads update + * the last processed sequence number of subscribed shards as they fetch and process records. + * + * <p>Note that since multiple {@link ShardConsumer} threads will be performing operations on this list, all operations + * must be wrapped in synchronized blocks on the {@link KinesisDataFetcher#checkpointLock} lock. For this purpose, + * all threads must use the following thread-safe methods this class provides to operate on this list: + * <ul> + * <li>{@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}</li> + * <li>{@link KinesisDataFetcher#updateState(int, SequenceNumber)}</li> + * <li>{@link KinesisDataFetcher#emitRecordAndUpdateState(T, long, int, SequenceNumber)}</li> + * </ul> + */ + private final List<KinesisStreamShardState> subscribedShardsState; + + private final SourceFunction.SourceContext<T> sourceContext; + + /** Checkpoint lock, also used to synchronize operations on subscribedShardsState */ + private final Object checkpointLock; + + /** Reference to the first error thrown by any of the {@link ShardConsumer} threads */ + private final AtomicReference<Throwable> error; + + /** The Kinesis proxy that the fetcher will be using to discover new shards */ + private final KinesisProxyInterface kinesis; + + /** Thread that executed runFetcher() */ + private Thread mainThread; + + /** + * The current number of shards that are actively read by this fetcher. + * + * This value is updated in {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, + * and {@link KinesisDataFetcher#updateState(int, SequenceNumber)}. + */ + private final AtomicInteger numberOfActiveShards = new AtomicInteger(0); + + private volatile boolean running = true; + + /** + * Creates a Kinesis Data Fetcher. + * + * @param streams the streams to subscribe to + * @param sourceContext context of the source function + * @param runtimeContext this subtask's runtime context + * @param configProps the consumer configuration properties + * @param deserializationSchema deserialization schema + */ + public KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializationSchema) { + this(streams, + sourceContext, + sourceContext.getCheckpointLock(), + runtimeContext, + configProps, + deserializationSchema, + new AtomicReference<Throwable>(), + new LinkedList<KinesisStreamShardState>(), + createInitialSubscribedStreamsToLastDiscoveredShardsState(streams), + KinesisProxy.create(configProps)); + } + + /** This constructor is exposed for testing purposes */ + protected KinesisDataFetcher(List<String> streams, + SourceFunction.SourceContext<T> sourceContext, + Object checkpointLock, + RuntimeContext runtimeContext, + Properties configProps, + KinesisDeserializationSchema<T> deserializationSchema, + AtomicReference<Throwable> error, + LinkedList<KinesisStreamShardState> subscribedShardsState, + HashMap<String, String> subscribedStreamsToLastDiscoveredShardIds, + KinesisProxyInterface kinesis) { + this.streams = checkNotNull(streams); + this.configProps = checkNotNull(configProps); + this.sourceContext = checkNotNull(sourceContext); + this.checkpointLock = checkNotNull(checkpointLock); + this.runtimeContext = checkNotNull(runtimeContext); + this.totalNumberOfConsumerSubtasks = runtimeContext.getNumberOfParallelSubtasks(); + this.indexOfThisConsumerSubtask = runtimeContext.getIndexOfThisSubtask(); + this.deserializationSchema = checkNotNull(deserializationSchema); + this.kinesis = checkNotNull(kinesis); + + this.error = checkNotNull(error); + this.subscribedShardsState = checkNotNull(subscribedShardsState); + this.subscribedStreamsToLastDiscoveredShardIds = checkNotNull(subscribedStreamsToLastDiscoveredShardIds); + + this.shardConsumersExecutor = + createShardConsumersThreadPool(runtimeContext.getTaskNameWithSubtasks()); + } + + /** + * Starts the fetcher. After starting the fetcher, it can only + * be stopped by calling {@link KinesisDataFetcher#shutdownFetcher()}. + * + * @throws Exception the first error or exception thrown by the fetcher or any of the threads created by the fetcher. + */ + public void runFetcher() throws Exception { + + // check that we are running before proceeding + if (!running) { + return; + } + + this.mainThread = Thread.currentThread(); + + // ------------------------------------------------------------------------ + // Procedures before starting the infinite while loop: + // ------------------------------------------------------------------------ + + // 1. query for any new shards that may have been created while the Kinesis consumer was not running, + // and register them to the subscribedShardState list. + if (LOG.isDebugEnabled()) { + String logFormat = (!isRestoredFromFailure) + ? "Subtask {} is trying to discover initial shards ..." + : "Subtask {} is trying to discover any new shards that were created while the consumer wasn't " + + "running due to failure ..."; + + LOG.debug(logFormat, indexOfThisConsumerSubtask); + } + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { + // the starting state for new shards created while the consumer wasn't running depends on whether or not + // we are starting fresh (not restoring from a checkpoint); when we are starting fresh, this simply means + // all existing shards of streams we are subscribing to are new shards; when we are restoring from checkpoint, + // any new shards due to Kinesis resharding from the time of the checkpoint will be considered new shards. + InitialPosition initialPosition = InitialPosition.valueOf(configProps.getProperty( + ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + + SentinelSequenceNumber startingStateForNewShard = (isRestoredFromFailure) + ? SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM + : initialPosition.toSentinelSequenceNumber(); + + if (LOG.isInfoEnabled()) { + String logFormat = (!isRestoredFromFailure) + ? "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}" + : "Subtask {} will be seeded with new shard {} that was created while the consumer wasn't " + + "running due to failure, starting state set as sequence number {}"; + + LOG.info(logFormat, indexOfThisConsumerSubtask, shard.toString(), startingStateForNewShard.get()); + } + registerNewSubscribedShardState(new KinesisStreamShardState(shard, startingStateForNewShard.get())); + } + + // 2. check that there is at least one shard in the subscribed streams to consume from (can be done by + // checking if at least one value in subscribedStreamsToLastDiscoveredShardIds is not null) + boolean hasShards = false; + StringBuilder streamsWithNoShardsFound = new StringBuilder(); + for (Map.Entry<String, String> streamToLastDiscoveredShardEntry : subscribedStreamsToLastDiscoveredShardIds.entrySet()) { + if (streamToLastDiscoveredShardEntry.getValue() != null) { + hasShards = true; + } else { + streamsWithNoShardsFound.append(streamToLastDiscoveredShardEntry.getKey()).append(", "); + } + } + + if (streamsWithNoShardsFound.length() != 0 && LOG.isWarnEnabled()) { + LOG.warn("Subtask {} has failed to find any shards for the following subscribed streams: {}", + indexOfThisConsumerSubtask, streamsWithNoShardsFound.toString()); + } + + if (!hasShards) { + throw new RuntimeException("No shards can be found for all subscribed streams: " + streams); + } + + // 3. start consuming any shard state we already have in the subscribedShardState up to this point; the + // subscribedShardState may already be seeded with values due to step 1., or explicitly added by the + // consumer using a restored state checkpoint + for (int seededStateIndex = 0; seededStateIndex < subscribedShardsState.size(); seededStateIndex++) { + KinesisStreamShardState seededShardState = subscribedShardsState.get(seededStateIndex); + + // only start a consuming thread if the seeded subscribed shard has not been completely read already + if (!seededShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} will start consuming seeded shard {} from sequence number {} with ShardConsumer {}", + indexOfThisConsumerSubtask, seededShardState.getKinesisStreamShard().toString(), + seededShardState.getLastProcessedSequenceNum(), seededStateIndex); + } + + shardConsumersExecutor.submit( + new ShardConsumer<>( + this, + seededStateIndex, + subscribedShardsState.get(seededStateIndex).getKinesisStreamShard(), + subscribedShardsState.get(seededStateIndex).getLastProcessedSequenceNum())); + } + } + + // ------------------------------------------------------------------------ + + // finally, start the infinite shard discovery and consumer launching loop; + // we will escape from this loop only when shutdownFetcher() or stopWithError() is called + + final long discoveryIntervalMillis = Long.valueOf( + configProps.getProperty( + ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, + Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS))); + + // FLINK-4341: + // For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark + // for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise + // the downstream watermarks would not advance, leading to unbounded accumulating state. + // + // The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard + // is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks + // will be messed up. + // + // There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard: + // (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max + // value watermark. This case is encountered when 1) all previously read shards by this subtask were closed + // due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer + // was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup. + // (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted + // a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards + // will be subscribed by this subtask after restore as initial shards on startup. + // + // TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager + // Please see FLINK-4341 for more detail + + boolean emittedMaxValueWatermark = false; + + if (this.numberOfActiveShards.get() == 0) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } + + while (running) { + if (LOG.isDebugEnabled()) { + LOG.debug("Subtask {} is trying to discover new shards that were created due to resharding ...", + indexOfThisConsumerSubtask); + } + List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe(); + + // -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards -- + // Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists + // a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards + // may not correctly reflect the discover result in the below case determination. This may lead to incorrect + // case determination on the current discovery attempt, but can still be correctly handled on future attempts. + // + // Although this can be resolved by wrapping the current shard discovery attempt with the below + // case determination within a synchronized block on the checkpoint lock for atomicity, there will be + // considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore, + // since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as + // we can still eventually handle max value watermark emitting / deliberately failing on successive + // discovery attempts. + + if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) { + // FLINK-4341 workaround case (a) - please see the above for details on this case + LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...", + indexOfThisConsumerSubtask); + sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE)); + emittedMaxValueWatermark = true; + } else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) { + // FLINK-4341 workaround case (b) - please see the above for details on this case + // + // Note that in the case where on resharding this subtask ceased to read all of it's previous shards + // but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark + // will be false; this allows the fetcher to continue reading the new shards without failing on such cases. + // However, due to the race condition mentioned above, we might still fall into case (a) first, and + // then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value + // watermark emitting still remains to be correct. + + LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" + + " up watermarks; the new shards will be subscribed by this subtask after restore ...", + indexOfThisConsumerSubtask, newShardsDueToResharding.size()); + throw new RuntimeException("Deliberate failure to avoid messing up watermarks"); + } + + for (KinesisStreamShard shard : newShardsDueToResharding) { + // since there may be delay in discovering a new shard, all new shards due to + // resharding should be read starting from the earliest record possible + KinesisStreamShardState newShardState = + new KinesisStreamShardState(shard, SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()); + int newStateIndex = registerNewSubscribedShardState(newShardState); + + if (LOG.isInfoEnabled()) { + LOG.info("Subtask {} has discovered a new shard {} due to resharding, and will start consuming " + + "the shard from sequence number {} with ShardConsumer {}", + indexOfThisConsumerSubtask, newShardState.getKinesisStreamShard().toString(), + newShardState.getLastProcessedSequenceNum(), newStateIndex); + } + + shardConsumersExecutor.submit( + new ShardConsumer<>( + this, + newStateIndex, + newShardState.getKinesisStreamShard(), + newShardState.getLastProcessedSequenceNum())); + } + + // we also check if we are running here so that we won't start the discovery sleep + // interval if the running flag was set to false during the middle of the while loop + if (running && discoveryIntervalMillis != 0) { + try { + Thread.sleep(discoveryIntervalMillis); + } catch (InterruptedException iex) { + // the sleep may be interrupted by shutdownFetcher() + } + } + } + + // make sure all resources have been terminated before leaving + awaitTermination(); + + // any error thrown in the shard consumer threads will be thrown to the main thread + Throwable throwable = this.error.get(); + if (throwable != null) { + if (throwable instanceof Exception) { + throw (Exception) throwable; + } else if (throwable instanceof Error) { + throw (Error) throwable; + } else { + throw new Exception(throwable); + } + } + } + + /** + * Creates a snapshot of the current last processed sequence numbers of each subscribed shard. + * + * @return state snapshot + */ + public HashMap<KinesisStreamShard, SequenceNumber> snapshotState() { + // this method assumes that the checkpoint lock is held + assert Thread.holdsLock(checkpointLock); + + HashMap<KinesisStreamShard, SequenceNumber> stateSnapshot = new HashMap<>(); + for (KinesisStreamShardState shardWithState : subscribedShardsState) { + stateSnapshot.put(shardWithState.getKinesisStreamShard(), shardWithState.getLastProcessedSequenceNum()); + } + return stateSnapshot; + } + + /** + * Starts shutting down the fetcher. Must be called to allow {@link KinesisDataFetcher#runFetcher()} to complete. + * Once called, the shutdown procedure will be executed and all shard consuming threads will be interrupted. + */ + public void shutdownFetcher() { + running = false; + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + + if (LOG.isInfoEnabled()) { + LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask); + } + shardConsumersExecutor.shutdownNow(); + } + + /** After calling {@link KinesisDataFetcher#shutdownFetcher()}, this can be called to await the fetcher shutdown */ + public void awaitTermination() throws InterruptedException { + while(!shardConsumersExecutor.isTerminated()) { + Thread.sleep(50); + } + } + + /** Called by created threads to pass on errors. Only the first thrown error is set. + * Once set, the shutdown process will be executed and all shard consuming threads will be interrupted. */ + protected void stopWithError(Throwable throwable) { + if (this.error.compareAndSet(null, throwable)) { + shutdownFetcher(); + } + } + + // ------------------------------------------------------------------------ + // Functions that update the subscribedStreamToLastDiscoveredShardIds state + // ------------------------------------------------------------------------ + + /** Updates the last discovered shard of a subscribed stream; only updates if the update is valid */ + public void advanceLastDiscoveredShardOfStream(String stream, String shardId) { + String lastSeenShardIdOfStream = this.subscribedStreamsToLastDiscoveredShardIds.get(stream); + + // the update is valid only if the given shard id is greater + // than the previous last seen shard id of the stream + if (lastSeenShardIdOfStream == null) { + // if not previously set, simply put as the last seen shard id + this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); + } else if (KinesisStreamShard.compareShardIds(shardId, lastSeenShardIdOfStream) > 0) { + this.subscribedStreamsToLastDiscoveredShardIds.put(stream, shardId); + } + } + + /** + * A utility function that does the following: + * + * 1. Find new shards for each stream that we haven't seen before + * 2. For each new shard, determine whether this consumer subtask should subscribe to them; + * if yes, it is added to the returned list of shards + * 3. Update the subscribedStreamsToLastDiscoveredShardIds state so that we won't get shards + * that we have already seen before the next time this function is called + */ + private List<KinesisStreamShard> discoverNewShardsToSubscribe() throws InterruptedException { + + List<KinesisStreamShard> newShardsToSubscribe = new LinkedList<>(); + + GetShardListResult shardListResult = kinesis.getShardList(subscribedStreamsToLastDiscoveredShardIds); + if (shardListResult.hasRetrievedShards()) { + Set<String> streamsWithNewShards = shardListResult.getStreamsWithRetrievedShards(); + + for (String stream : streamsWithNewShards) { + List<KinesisStreamShard> newShardsOfStream = shardListResult.getRetrievedShardListOfStream(stream); + for (KinesisStreamShard newShard : newShardsOfStream) { + if (isThisSubtaskShouldSubscribeTo(newShard, totalNumberOfConsumerSubtasks, indexOfThisConsumerSubtask)) { + newShardsToSubscribe.add(newShard); + } + } + + advanceLastDiscoveredShardOfStream( + stream, shardListResult.getLastSeenShardOfStream(stream).getShard().getShardId()); + } + } + + return newShardsToSubscribe; + } + + // ------------------------------------------------------------------------ + // Functions to get / set information about the consumer + // ------------------------------------------------------------------------ + + public void setIsRestoringFromFailure(boolean bool) { + this.isRestoredFromFailure = bool; + } + + protected Properties getConsumerConfiguration() { + return configProps; + } + + protected KinesisDeserializationSchema<T> getClonedDeserializationSchema() { + try { + return InstantiationUtil.clone(deserializationSchema, runtimeContext.getUserCodeClassLoader()); + } catch (IOException | ClassNotFoundException ex) { + // this really shouldn't happen; simply wrap it around a runtime exception + throw new RuntimeException(ex); + } + } + + // ------------------------------------------------------------------------ + // Thread-safe operations for record emitting and shard state updating + // that assure atomicity with respect to the checkpoint lock + // ------------------------------------------------------------------------ + + /** + * Atomic operation to collect a record and update state to the sequence number of the record. + * This method is called by {@link ShardConsumer}s. + * + * @param record the record to collect + * @param recordTimestamp timestamp to attach to the collected record + * @param shardStateIndex index of the shard to update in subscribedShardsState; + * this index should be the returned value from + * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called + * when the shard state was registered. + * @param lastSequenceNumber the last sequence number value to update + */ + protected void emitRecordAndUpdateState(T record, long recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) { + synchronized (checkpointLock) { + sourceContext.collectWithTimestamp(record, recordTimestamp); + updateState(shardStateIndex, lastSequenceNumber); + } + } + + /** + * Update the shard to last processed sequence number state. + * This method is called by {@link ShardConsumer}s. + * + * @param shardStateIndex index of the shard to update in subscribedShardsState; + * this index should be the returned value from + * {@link KinesisDataFetcher#registerNewSubscribedShardState(KinesisStreamShardState)}, called + * when the shard state was registered. + * @param lastSequenceNumber the last sequence number value to update + */ + protected void updateState(int shardStateIndex, SequenceNumber lastSequenceNumber) { + synchronized (checkpointLock) { + subscribedShardsState.get(shardStateIndex).setLastProcessedSequenceNum(lastSequenceNumber); + + // if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread, + // we've finished reading the shard and should determine it to be non-active + if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.decrementAndGet(); + LOG.info("Subtask {} has reached the end of subscribed shard: {}", + indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard()); + } + } + } + + /** + * Register a new subscribed shard state. + * + * @param newSubscribedShardState the new shard state that this fetcher is to be subscribed to + */ + public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribedShardState) { + synchronized (checkpointLock) { + subscribedShardsState.add(newSubscribedShardState); + + // If a registered shard has initial state that is not SENTINEL_SHARD_ENDING_SEQUENCE_NUM (will be the case + // if the consumer had already finished reading a shard before we failed and restored), we determine that + // this subtask has a new active shard + if (!newSubscribedShardState.getLastProcessedSequenceNum().equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { + this.numberOfActiveShards.incrementAndGet(); + } + + return subscribedShardsState.size()-1; + } + } + + // ------------------------------------------------------------------------ + // Miscellaneous utility functions + // ------------------------------------------------------------------------ + + /** + * Utility function to determine whether a shard should be subscribed by this consumer subtask. + * + * @param shard the shard to determine + * @param totalNumberOfConsumerSubtasks total number of consumer subtasks + * @param indexOfThisConsumerSubtask index of this consumer subtask + */ + private static boolean isThisSubtaskShouldSubscribeTo(KinesisStreamShard shard, + int totalNumberOfConsumerSubtasks, + int indexOfThisConsumerSubtask) { + return (Math.abs(shard.hashCode() % totalNumberOfConsumerSubtasks)) == indexOfThisConsumerSubtask; + } + + private static ExecutorService createShardConsumersThreadPool(final String subtaskName) { + return Executors.newCachedThreadPool(new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + final AtomicLong threadCount = new AtomicLong(0); + Thread thread = new Thread(runnable); + thread.setName("shardConsumers-" + subtaskName + "-thread-" + threadCount.getAndIncrement()); + thread.setDaemon(true); + return thread; + } + }); + } + + /** + * Utility function to create an initial map of the last discovered shard id of each subscribed stream, set to null; + * This is called in the constructor; correct values will be set later on by calling advanceLastDiscoveredShardOfStream() + * + * @param streams the list of subscribed streams + * @return the initial map for subscribedStreamsToLastDiscoveredShardIds + */ + protected static HashMap<String, String> createInitialSubscribedStreamsToLastDiscoveredShardsState(List<String> streams) { + HashMap<String, String> initial = new HashMap<>(); + for (String stream : streams) { + initial.put(stream, null); + } + return initial; + } +}
