http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml deleted file mode 100644 index 45b3b92..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/resources/logback-test.xml +++ /dev/null @@ -1,30 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> - <logger name="org.apache.flink.streaming" level="WARN"/> -</configuration> \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/pom.xml b/flink-streaming-connectors/flink-connector-kinesis/pom.xml deleted file mode 100644 index 29170ad..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/pom.xml +++ /dev/null @@ -1,164 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-connectors</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-connector-kinesis_2.10</artifactId> - <name>flink-connector-kinesis</name> - <properties> - <aws.sdk.version>1.10.71</aws.sdk.version> - <aws.kinesis-kcl.version>1.6.2</aws.kinesis-kcl.version> - <aws.kinesis-kpl.version>0.10.2</aws.kinesis-kpl.version> - </properties> - - <packaging>jar</packaging> - - <dependencies> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_2.10</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <!-- Note: - The below dependencies are licenced under the Amazon Software License. - Flink includes the "flink-connector-kinesis" only as an optional dependency for that reason. - --> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-kinesis</artifactId> - <version>${aws.sdk.version}</version> - </dependency> - - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-producer</artifactId> - <version>${aws.kinesis-kpl.version}</version> - </dependency> - - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>amazon-kinesis-client</artifactId> - <version>${aws.kinesis-kcl.version}</version> - <!-- - We're excluding the below from the KCL since we'll only be using the - com.amazonaws.services.kinesis.clientlibrary.types.UserRecord class, which will not need these dependencies. - --> - <exclusions> - <exclusion> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-dynamodb</artifactId> - </exclusion> - <exclusion> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-cloudwatch</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <promoteTransitiveDependencies>true</promoteTransitiveDependencies> - <artifactSet combine.children="append"> - <includes> - <include>com.amazonaws:*</include> - <include>com.google.protobuf:*</include> - </includes> - </artifactSet> - <relocations combine.children="override"> - <!-- DO NOT RELOCATE GUAVA IN THIS PACKAGE --> - <relocation> - <pattern>org.objectweb.asm</pattern> - <shadedPattern>org.apache.flink.shaded.org.objectweb.asm</shadedPattern> - </relocation> - <relocation> - <pattern>com.google.protobuf</pattern> - <shadedPattern>org.apache.flink.kinesis.shaded.com.google.protobuf</shadedPattern> - </relocation> - <relocation> - <pattern>com.amazonaws</pattern> - <shadedPattern>org.apache.flink.kinesis.shaded.com.amazonaws</shadedPattern> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java deleted file mode 100644 index a62dc10..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; -import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; -import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.List; -import java.util.Properties; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The Flink Kinesis Consumer is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis - * streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is - * responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will - * change as shards are closed and created by Kinesis. - * - * <p>To leverage Flink's checkpointing mechanics for exactly-once streaming processing guarantees, the Flink Kinesis - * consumer is implemented with the AWS Java SDK, instead of the officially recommended AWS Kinesis Client Library, for - * low-level control on the management of stream state. The Flink Kinesis Connector also supports setting the initial - * starting points of Kinesis streams, namely TRIM_HORIZON and LATEST.</p> - * - * @param <T> the type of data emitted - */ -public class FlinkKinesisConsumer<T> extends RichParallelSourceFunction<T> - implements CheckpointedAsynchronously<HashMap<KinesisStreamShard, SequenceNumber>>, ResultTypeQueryable<T> { - - private static final long serialVersionUID = 4724006128720664870L; - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisConsumer.class); - - // ------------------------------------------------------------------------ - // Consumer properties - // ------------------------------------------------------------------------ - - /** The names of the Kinesis streams that we will be consuming from */ - private final List<String> streams; - - /** Properties to parametrize settings such as AWS service region, initial position in stream, - * shard list retrieval behaviours, etc */ - private final Properties configProps; - - /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */ - private final KinesisDeserializationSchema<T> deserializer; - - // ------------------------------------------------------------------------ - // Runtime state - // ------------------------------------------------------------------------ - - /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */ - private transient KinesisDataFetcher<T> fetcher; - - /** The sequence numbers in the last state snapshot of this subtask */ - private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot; - - /** The sequence numbers to restore to upon restore from failure */ - private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore; - - private volatile boolean running = true; - - - // ------------------------------------------------------------------------ - // Constructors - // ------------------------------------------------------------------------ - - /** - * Creates a new Flink Kinesis Consumer. - * - * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming - * from are configured with a {@link Properties} instance.</p> - * - * @param stream - * The single AWS Kinesis stream to read from. - * @param deserializer - * The deserializer used to convert raw bytes of Kinesis records to Java objects (without key). - * @param configProps - * The properties used to configure AWS credentials, AWS region, and initial starting position. - */ - public FlinkKinesisConsumer(String stream, DeserializationSchema<T> deserializer, Properties configProps) { - this(stream, new KinesisDeserializationSchemaWrapper<>(deserializer), configProps); - } - - /** - * Creates a new Flink Kinesis Consumer. - * - * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming - * from are configured with a {@link Properties} instance.</p> - * - * @param stream - * The single AWS Kinesis stream to read from. - * @param deserializer - * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects. - * @param configProps - * The properties used to configure AWS credentials, AWS region, and initial starting position. - */ - public FlinkKinesisConsumer(String stream, KinesisDeserializationSchema<T> deserializer, Properties configProps) { - this(Collections.singletonList(stream), deserializer, configProps); - } - - /** - * Creates a new Flink Kinesis Consumer. - * - * <p>The AWS credentials to be used, AWS region of the Kinesis streams, initial position to start streaming - * from are configured with a {@link Properties} instance.</p> - * - * @param streams - * The AWS Kinesis streams to read from. - * @param deserializer - * The keyed deserializer used to convert raw bytes of Kinesis records to Java objects. - * @param configProps - * The properties used to configure AWS credentials, AWS region, and initial starting position. - */ - public FlinkKinesisConsumer(List<String> streams, KinesisDeserializationSchema<T> deserializer, Properties configProps) { - checkNotNull(streams, "streams can not be null"); - checkArgument(streams.size() != 0, "must be consuming at least 1 stream"); - checkArgument(!streams.contains(""), "stream names cannot be empty Strings"); - this.streams = streams; - - this.configProps = checkNotNull(configProps, "configProps can not be null"); - - // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateConsumerConfiguration(this.configProps); - - this.deserializer = checkNotNull(deserializer, "deserializer can not be null"); - - if (LOG.isInfoEnabled()) { - StringBuilder sb = new StringBuilder(); - for (String stream : streams) { - sb.append(stream).append(", "); - } - LOG.info("Flink Kinesis Consumer is going to read the following streams: {}", sb.toString()); - } - } - - // ------------------------------------------------------------------------ - // Source life cycle - // ------------------------------------------------------------------------ - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - // restore to the last known sequence numbers from the latest complete snapshot - if (sequenceNumsToRestore != null) { - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} is restoring sequence numbers {} from previous checkpointed state", - getRuntimeContext().getIndexOfThisSubtask(), sequenceNumsToRestore.toString()); - } - - // initialize sequence numbers with restored state - lastStateSnapshot = sequenceNumsToRestore; - } else { - // start fresh with empty sequence numbers if there are no snapshots to restore from. - lastStateSnapshot = new HashMap<>(); - } - } - - @Override - public void run(SourceContext<T> sourceContext) throws Exception { - - // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have - // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks - // can potentially have new shards to subscribe to later on - fetcher = new KinesisDataFetcher<>( - streams, sourceContext, getRuntimeContext(), configProps, deserializer); - - boolean isRestoringFromFailure = (sequenceNumsToRestore != null); - fetcher.setIsRestoringFromFailure(isRestoringFromFailure); - - // if we are restoring from a checkpoint, we iterate over the restored - // state and accordingly seed the fetcher with subscribed shards states - if (isRestoringFromFailure) { - for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { - fetcher.advanceLastDiscoveredShardOfStream( - restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); - - if (LOG.isInfoEnabled()) { - LOG.info("Subtask {} is seeding the fetcher with restored shard {}," + - " starting state set to the restored sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), restored.getKey().toString(), restored.getValue()); - } - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState(restored.getKey(), restored.getValue())); - } - } - - // check that we are running before starting the fetcher - if (!running) { - return; - } - - // start the fetcher loop. The fetcher will stop running only when cancel() or - // close() is called, or an error is thrown by threads created by the fetcher - fetcher.runFetcher(); - - // check that the fetcher has terminated before fully closing - fetcher.awaitTermination(); - sourceContext.close(); - } - - @Override - public void cancel() { - running = false; - - KinesisDataFetcher fetcher = this.fetcher; - this.fetcher = null; - - // this method might be called before the subtask actually starts running, - // so we must check if the fetcher is actually created - if (fetcher != null) { - try { - // interrupt the fetcher of any work - fetcher.shutdownFetcher(); - fetcher.awaitTermination(); - } catch (Exception e) { - LOG.warn("Error while closing Kinesis data fetcher", e); - } - } - } - - @Override - public void close() throws Exception { - cancel(); - super.close(); - } - - @Override - public TypeInformation<T> getProducedType() { - return deserializer.getProducedType(); - } - - // ------------------------------------------------------------------------ - // State Snapshot & Restore - // ------------------------------------------------------------------------ - - @Override - public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - if (lastStateSnapshot == null) { - LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - - if (fetcher == null) { - LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { - LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } - - lastStateSnapshot = fetcher.snapshotState(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", - lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); - } - - return lastStateSnapshot; - } - - @Override - public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { - sequenceNumsToRestore = restoredState; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java deleted file mode 100644 index 579bd6b..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kinesis; - -import com.amazonaws.services.kinesis.producer.Attempt; -import com.amazonaws.services.kinesis.producer.KinesisProducer; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; -import com.amazonaws.services.kinesis.producer.UserRecordFailedException; -import com.amazonaws.services.kinesis.producer.UserRecordResult; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; -import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerializationSchema; -import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; -import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.util.PropertiesUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Objects; -import java.util.Properties; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. - * - * @param <OUT> Data type to produce into Kinesis Streams - */ -public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { - - private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); - - /** Properties to parametrize settings such as AWS service region, access key etc. */ - private final Properties configProps; - - /* Flag controlling the error behavior of the producer */ - private boolean failOnError = false; - - /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ - private String defaultStream; - - /* Default partition id. Can be overwritten by the serialization schema */ - private String defaultPartition; - - /* Schema for turning the OUT type into a byte array. */ - private final KinesisSerializationSchema<OUT> schema; - - /* Optional custom partitioner */ - private KinesisPartitioner<OUT> customPartitioner = null; - - - // --------------------------- Runtime fields --------------------------- - - - /* Our Kinesis instance for each parallel Flink sink */ - private transient KinesisProducer producer; - - /* Callback handling failures */ - private transient FutureCallback<UserRecordResult> callback; - - /* Field for async exception */ - private transient volatile Throwable thrownException; - - - // --------------------------- Initialization and configuration --------------------------- - - - /** - * Create a new FlinkKinesisProducer. - * This is a constructor supporting Flink's {@see SerializationSchema}. - * - * @param schema Serialization schema for the data type - * @param configProps The properties used to configure AWS credentials and AWS region - */ - public FlinkKinesisProducer(final SerializationSchema<OUT> schema, Properties configProps) { - - // create a simple wrapper for the serialization schema - this(new KinesisSerializationSchema<OUT>() { - @Override - public ByteBuffer serialize(OUT element) { - // wrap into ByteBuffer - return ByteBuffer.wrap(schema.serialize(element)); - } - // use default stream and hash key - @Override - public String getTargetStream(OUT element) { - return null; - } - }, configProps); - } - - /** - * Create a new FlinkKinesisProducer. - * This is a constructor supporting {@see KinesisSerializationSchema}. - * - * @param schema Kinesis serialization schema for the data type - * @param configProps The properties used to configure AWS credentials and AWS region - */ - public FlinkKinesisProducer(KinesisSerializationSchema<OUT> schema, Properties configProps) { - this.configProps = checkNotNull(configProps, "configProps can not be null"); - - // check the configuration properties for any conflicting settings - KinesisConfigUtil.validateProducerConfiguration(this.configProps); - - ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); - this.schema = schema; - } - - /** - * If set to true, the producer will immediately fail with an exception on any error. - * Otherwise, the errors are logged and the producer goes on. - * - * @param failOnError Error behavior flag - */ - public void setFailOnError(boolean failOnError) { - this.failOnError = failOnError; - } - - /** - * Set a default stream name. - * @param defaultStream Name of the default Kinesis stream - */ - public void setDefaultStream(String defaultStream) { - this.defaultStream = defaultStream; - } - - /** - * Set default partition id - * @param defaultPartition Name of the default partition - */ - public void setDefaultPartition(String defaultPartition) { - this.defaultPartition = defaultPartition; - } - - public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) { - Objects.requireNonNull(partitioner); - ClosureCleaner.ensureSerializable(partitioner); - this.customPartitioner = partitioner; - } - - - // --------------------------- Lifecycle methods --------------------------- - - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration(); - - producerConfig.setRegion(configProps.getProperty(ProducerConfigConstants.AWS_REGION)); - producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); - if (configProps.containsKey(ProducerConfigConstants.COLLECTION_MAX_COUNT)) { - producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); - } - if (configProps.containsKey(ProducerConfigConstants.AGGREGATION_MAX_COUNT)) { - producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, - ProducerConfigConstants.AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); - } - - producer = new KinesisProducer(producerConfig); - callback = new FutureCallback<UserRecordResult>() { - @Override - public void onSuccess(UserRecordResult result) { - if (!result.isSuccessful()) { - if(failOnError) { - thrownException = new RuntimeException("Record was not sent successful"); - } else { - LOG.warn("Record was not sent successful"); - } - } - } - - @Override - public void onFailure(Throwable t) { - if (failOnError) { - thrownException = t; - } else { - LOG.warn("An exception occurred while processing a record", t); - } - } - }; - - if (this.customPartitioner != null) { - this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); - } - - LOG.info("Started Kinesis producer instance for region '{}'", producerConfig.getRegion()); - } - - @Override - public void invoke(OUT value) throws Exception { - if (this.producer == null) { - throw new RuntimeException("Kinesis producer has been closed"); - } - if (thrownException != null) { - String errorMessages = ""; - if (thrownException instanceof UserRecordFailedException) { - List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); - for (Attempt attempt: attempts) { - if (attempt.getErrorMessage() != null) { - errorMessages += attempt.getErrorMessage() +"\n"; - } - } - } - if (failOnError) { - throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); - } else { - LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); - thrownException = null; // reset - } - } - - String stream = defaultStream; - String partition = defaultPartition; - - ByteBuffer serialized = schema.serialize(value); - - // maybe set custom stream - String customStream = schema.getTargetStream(value); - if (customStream != null) { - stream = customStream; - } - - String explicitHashkey = null; - // maybe set custom partition - if (customPartitioner != null) { - partition = customPartitioner.getPartitionId(value); - explicitHashkey = customPartitioner.getExplicitHashKey(value); - } - - if (stream == null) { - if (failOnError) { - throw new RuntimeException("No target stream set"); - } else { - LOG.warn("No target stream set. Skipping record"); - return; - } - } - - ListenableFuture<UserRecordResult> cb = producer.addUserRecord(stream, partition, explicitHashkey, serialized); - Futures.addCallback(cb, callback); - } - - @Override - public void close() throws Exception { - LOG.info("Closing producer"); - super.close(); - KinesisProducer kp = this.producer; - this.producer = null; - if (kp != null) { - LOG.info("Flushing outstanding {} records", kp.getOutstandingRecordsCount()); - // try to flush all outstanding records - while (kp.getOutstandingRecordsCount() > 0) { - kp.flush(); - try { - Thread.sleep(500); - } catch (InterruptedException e) { - LOG.warn("Flushing was interrupted."); - // stop the blocking flushing and destroy producer immediately - break; - } - } - LOG.info("Flushing done. Destroying producer instance."); - kp.destroy(); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java deleted file mode 100644 index bd23abe..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/KinesisPartitioner.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kinesis; - - -import java.io.Serializable; - -public abstract class KinesisPartitioner<T> implements Serializable { - - /** - * Return a partition id based on the input - * @param element Element to partition - * @return A string representing the partition id - */ - public abstract String getPartitionId(T element); - - /** - * Optional method for setting an explicit hash key - * @param element Element to get the hash key for - * @return the hash key for the element - */ - public String getExplicitHashKey(T element) { - return null; - } - - /** - * Optional initializer. - * - * @param indexOfThisSubtask Index of this partitioner instance - * @param numberOfParallelSubtasks Total number of parallel instances - */ - public void initialize(int indexOfThisSubtask, int numberOfParallelSubtasks) { - // - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java deleted file mode 100644 index 01d4f00..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/AWSConfigConstants.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -import com.amazonaws.auth.AWSCredentialsProvider; - -/** - * Configuration keys for AWS service usage - */ -public class AWSConfigConstants { - - /** - * Possible configuration values for the type of credential provider to use when accessing AWS Kinesis. - * Internally, a corresponding implementation of {@link AWSCredentialsProvider} will be used. - */ - public enum CredentialProvider { - - /** Look for the environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY to create AWS credentials */ - ENV_VAR, - - /** Look for Java system properties aws.accessKeyId and aws.secretKey to create AWS credentials */ - SYS_PROP, - - /** Use a AWS credentials profile file to create the AWS credentials */ - PROFILE, - - /** Simply create AWS credentials by supplying the AWS access key ID and AWS secret key in the configuration properties */ - BASIC, - - /** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata **/ - AUTO, - } - - /** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */ - public static final String AWS_REGION = "aws.region"; - - /** The AWS access key ID to use when setting credentials provider type to BASIC */ - public static final String AWS_ACCESS_KEY_ID = "aws.credentials.provider.basic.accesskeyid"; - - /** The AWS secret key to use when setting credentials provider type to BASIC */ - public static final String AWS_SECRET_ACCESS_KEY = "aws.credentials.provider.basic.secretkey"; - - /** The credential provider type to use when AWS credentials are required (BASIC is used if not set)*/ - public static final String AWS_CREDENTIALS_PROVIDER = "aws.credentials.provider"; - - /** Optional configuration for profile path if credential provider type is set to be PROFILE */ - public static final String AWS_PROFILE_PATH = "aws.credentials.provider.profile.path"; - - /** Optional configuration for profile name if credential provider type is set to be PROFILE */ - public static final String AWS_PROFILE_NAME = "aws.credentials.provider.profile.name"; - - /** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */ - public static final String AWS_ENDPOINT = "aws.endpoint"; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java deleted file mode 100644 index 76c20ed..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -import com.amazonaws.services.kinesis.model.ShardIteratorType; -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer; -import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; - -/** - * Optional consumer specific configuration keys and default values for {@link FlinkKinesisConsumer} - */ -public class ConsumerConfigConstants extends AWSConfigConstants { - - /** - * The initial position to start reading shards from. This will affect the {@link ShardIteratorType} used - * when the consumer tasks retrieve the first shard iterator for each Kinesis shard. - */ - public enum InitialPosition { - - /** Start reading from the earliest possible record in the stream (excluding expired data records) */ - TRIM_HORIZON(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM), - - /** Start reading from the latest incoming record */ - LATEST(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM); - - private SentinelSequenceNumber sentinelSequenceNumber; - - InitialPosition(SentinelSequenceNumber sentinelSequenceNumber) { - this.sentinelSequenceNumber = sentinelSequenceNumber; - } - - public SentinelSequenceNumber toSentinelSequenceNumber() { - return this.sentinelSequenceNumber; - } - } - - /** The initial position to start reading Kinesis streams from (LATEST is used if not set) */ - public static final String STREAM_INITIAL_POSITION = "flink.stream.initpos"; - - /** The base backoff time between each describeStream attempt */ - public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base"; - - /** The maximum backoff time between each describeStream attempt */ - public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max"; - - /** The power constant for exponential backoff between each describeStream attempt */ - public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; - - /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard */ - public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount"; - - /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException */ - public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries"; - - /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ - public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base"; - - /** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException */ - public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max"; - - /** The power constant for exponential backoff between each getRecords attempt */ - public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst"; - - /** The interval between each getRecords request to a AWS Kinesis shard in milliseconds */ - public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis"; - - /** The maximum number of getShardIterator attempts if we get ProvisionedThroughputExceededException */ - public static final String SHARD_GETITERATOR_RETRIES = "flink.shard.getiterator.maxretries"; - - /** The base backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ - public static final String SHARD_GETITERATOR_BACKOFF_BASE = "flink.shard.getiterator.backoff.base"; - - /** The maximum backoff time between getShardIterator attempts if we get a ProvisionedThroughputExceededException */ - public static final String SHARD_GETITERATOR_BACKOFF_MAX = "flink.shard.getiterator.backoff.max"; - - /** The power constant for exponential backoff between each getShardIterator attempt */ - public static final String SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getiterator.backoff.expconst"; - - /** The interval between each attempt to discover new shards */ - public static final String SHARD_DISCOVERY_INTERVAL_MILLIS = "flink.shard.discovery.intervalmillis"; - - // ------------------------------------------------------------------------ - // Default values for consumer configuration - // ------------------------------------------------------------------------ - - public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); - - public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; - - public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; - - public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100; - - public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; - - public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L; - - public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L; - - public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final long DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS = 0L; - - public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3; - - public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L; - - public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L; - - public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - - public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; - - /** - * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured - * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators. - */ - public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java deleted file mode 100644 index 1edddfc..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ProducerConfigConstants.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kinesis.config; - -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; - -/** - * Optional producer specific configuration keys for {@link FlinkKinesisProducer} - */ -public class ProducerConfigConstants extends AWSConfigConstants { - - /** Maximum number of items to pack into an PutRecords request. **/ - public static final String COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount"; - - /** Maximum number of items to pack into an aggregated record. **/ - public static final String AGGREGATION_MAX_COUNT = "aws.producer.aggregationMaxCount"; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java deleted file mode 100644 index 55668c6..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ConsumeFromKinesis.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kinesis.examples; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; -import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import java.util.Properties; - -/** - * This is an example on how to consume data from Kinesis - */ -public class ConsumeFromKinesis { - - public static void main(String[] args) throws Exception { - ParameterTool pt = ParameterTool.fromArgs(args); - - StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); - see.setParallelism(1); - - Properties kinesisConsumerConfig = new Properties(); - kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, pt.getRequired("region")); - kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accesskey")); - kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretkey")); - - DataStream<String> kinesis = see.addSource(new FlinkKinesisConsumer<>( - "flink-test", - new SimpleStringSchema(), - kinesisConsumerConfig)); - - kinesis.print(); - - see.execute(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java deleted file mode 100644 index d178137..0000000 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/examples/ProduceIntoKinesis.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kinesis.examples; - -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; -import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConstants; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import java.util.Properties; - -/** - * This is an example on how to produce data into Kinesis - */ -public class ProduceIntoKinesis { - - public static void main(String[] args) throws Exception { - ParameterTool pt = ParameterTool.fromArgs(args); - - StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); - see.setParallelism(1); - - DataStream<String> simpleStringStream = see.addSource(new EventsGenerator()); - - Properties kinesisProducerConfig = new Properties(); - kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_REGION, pt.getRequired("region")); - kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_ACCESS_KEY_ID, pt.getRequired("accessKey")); - kinesisProducerConfig.setProperty(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, pt.getRequired("secretKey")); - - FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>( - new SimpleStringSchema(), kinesisProducerConfig); - - kinesis.setFailOnError(true); - kinesis.setDefaultStream("flink-test"); - kinesis.setDefaultPartition("0"); - - simpleStringStream.addSink(kinesis); - - see.execute(); - } - - public static class EventsGenerator implements SourceFunction<String> { - private boolean running = true; - - @Override - public void run(SourceContext<String> ctx) throws Exception { - long seq = 0; - while(running) { - Thread.sleep(10); - ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); - } - } - - @Override - public void cancel() { - running = false; - } - } -}
