[ https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247639#comment-15247639 ]
ASF GitHub Bot commented on FLINK-3230: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1910#discussion_r60219813 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kinesis; + + +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.internal.StaticCredentialsProvider; +import com.amazonaws.services.kinesis.producer.Attempt; +import com.amazonaws.services.kinesis.producer.KinesisProducer; +import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; +import com.amazonaws.services.kinesis.producer.UserRecordFailedException; +import com.amazonaws.services.kinesis.producer.UserRecordResult; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; + +/** + * The FlinkKinesisProducer allows to produce from a Flink DataStream into Kinesis. + * + * @param <OUT> Data type to produce into Kinesis Streams + */ +public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkKinesisProducer.class); + + /* AWS region of the stream */ + private final String region; + + /* Access and secret key of the user */ + private final String accessKey; + private final String secretKey; + + /* Flag controlling the error behavior of the producer */ + private boolean failOnError = false; + + /* Name of the default stream to produce to. Can be overwritten by the serialization schema */ + private String defaultStream; + + /* Default partition id. Can be overwritten by the serialization schema */ + private String defaultPartition; + + /* Schema for turning the OUT type into a byte array. */ + private final KinesisSerializationSchema<OUT> schema; + + /* Optional custom partitioner */ + private KinesisPartitioner<OUT> customPartitioner = null; + + + // --------------------------- Runtime fields --------------------------- + + + /* Our Kinesis instance for each parallel Flink sink */ + private transient KinesisProducer producer; + + /* Callback handling failures */ + private transient FutureCallback<UserRecordResult> callback; + + /* Field for async exception */ + private transient Throwable thrownException; + + + // --------------------------- Initialization and configuration --------------------------- + + + /** + * Create a new FlinkKinesisProducer. + * This is a constructor supporting Flink's {@see SerializationSchema}. + * + * @param region AWS region of the stream + * @param accessKey Access key of a user with permission to access the stream (ideally also with access to Cloud Watch) + * @param secretKey Secret key of the user + * @param schema Serialization schema for the data type + */ + public FlinkKinesisProducer(String region, String accessKey, String secretKey, final SerializationSchema<OUT> schema) { + // create a simple wrapper for the serialization schema + this(region, accessKey, secretKey, new KinesisSerializationSchema<OUT>() { + @Override + public ByteBuffer serialize(OUT element) { + // wrap into ByteBuffer + return ByteBuffer.wrap(schema.serialize(element)); + } + // use default stream and hash key + @Override + public String getTargetStream(OUT element) { + return null; + } + }); + } + + public FlinkKinesisProducer(String region, String accessKey, String secretKey, KinesisSerializationSchema<OUT> schema) { + this.region = Objects.requireNonNull(region); + this.accessKey = Objects.requireNonNull(accessKey); + this.secretKey = Objects.requireNonNull(secretKey); + ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema)); + this.schema = schema; + } + + /** + * If set to true, the producer will immediately fail with an exception on any error. + * Otherwise, the errors are logged and the producer goes on. + * + * @param failOnError Error behavior flag + */ + public void setFailOnError(boolean failOnError) { + this.failOnError = failOnError; + } + + /** + * Set a default stream name. + * @param defaultStream Name of the default Kinesis stream + */ + public void setDefaultStream(String defaultStream) { + this.defaultStream = defaultStream; + } + + /** + * Set default partition id + * @param defaultPartition Name of the default partition + */ + public void setDefaultPartition(String defaultPartition) { + this.defaultPartition = defaultPartition; + } + + public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) { + Objects.requireNonNull(partitioner); + ClosureCleaner.ensureSerializable(partitioner); + this.customPartitioner = partitioner; + } + + + // --------------------------- Lifecycle methods --------------------------- + + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + KinesisProducerConfiguration config = new KinesisProducerConfiguration(); + config.setRegion(this.region); + config.setCredentialsProvider(new StaticCredentialsProvider(new BasicAWSCredentials(this.accessKey, this.secretKey))); + producer = new KinesisProducer(config); + callback = new FutureCallback<UserRecordResult>() { + @Override + public void onSuccess(UserRecordResult result) { + if(!result.isSuccessful()) { + if(failOnError) { + thrownException = new RuntimeException("Record was not sent successful"); + } else { + LOG.warn("Record was not sent successful"); + } + } + } + + @Override + public void onFailure(Throwable t) { + if(failOnError) { + thrownException = t; + } else { + LOG.warn("An exception occurred while processing a record", t); + } + } + }; + + if(this.customPartitioner != null) { + this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + + LOG.info("Started Kinesis producer instance for region '{}'", this.region); + } + + @Override + public void invoke(OUT value) throws Exception { + if(this.producer == null) { + throw new RuntimeException("Kinesis producer has been closed"); + } + if(thrownException != null) { + String errorMessages = ""; + if(thrownException instanceof UserRecordFailedException) { + List<Attempt> attempts = ((UserRecordFailedException) thrownException).getResult().getAttempts(); + for(Attempt attempt: attempts) { + if(attempt.getErrorMessage() != null) { + errorMessages += attempt.getErrorMessage() +"\n"; + } + } + } + if(failOnError) { + throw new RuntimeException("An exception was thrown while processing a record: " + errorMessages, thrownException); + } else { + LOG.warn("An exception was thrown while processing a record: {}", thrownException, errorMessages); + thrownException = null; // reset + } + } + + String stream = defaultStream; + String partition = defaultPartition; + + ByteBuffer serialized = schema.serialize(value); + + // maybe set custom stream + String customStream = schema.getTargetStream(value); + if(customStream != null) { + stream = customStream; + } + + String explicitHashkey = null; + // maybe set custom partition + if(customPartitioner != null) { + partition = customPartitioner.getPartitionId(value); + explicitHashkey = customPartitioner.getExplicitHashKey(value); + } + + if(stream == null) { + if(failOnError) { + throw new RuntimeException("No target stream set"); + } else { + LOG.warn("No target stream set. Skipping record"); --- End diff -- the schema can set a target stream on each record. So it can happen that a user didn't specify a default stream but the schema returns a value stream name for 99% of the data. > Kinesis streaming producer > -------------------------- > > Key: FLINK-3230 > URL: https://issues.apache.org/jira/browse/FLINK-3230 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Robert Metzger > > Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will > be using AWS SDK implementation for code consistency with the > FlinkKinesisConsumer. > The features of FlinkKinesisProducer is rather straightforward: > 1. Partition put records based on partition key. > 2. Configurable put mode: Bulk put for higher throughput vs. sequential > single record puts. Size of bulk should also be configurable. > 3. For bulk put, user can also choose to enforce strict ordering of the > result with the tradeoff of higher put latency. Ref: > https://brandur.org/kinesis-order -- This message was sent by Atlassian JIRA (v6.3.4#6332)