[ 
https://issues.apache.org/jira/browse/FLINK-3230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15247639#comment-15247639
 ] 

ASF GitHub Bot commented on FLINK-3230:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1910#discussion_r60219813
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kinesis;
    +
    +
    +import com.amazonaws.auth.BasicAWSCredentials;
    +import com.amazonaws.internal.StaticCredentialsProvider;
    +import com.amazonaws.services.kinesis.producer.Attempt;
    +import com.amazonaws.services.kinesis.producer.KinesisProducer;
    +import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
    +import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
    +import com.amazonaws.services.kinesis.producer.UserRecordResult;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.ByteBuffer;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
    + *
    + * @param <OUT> Data type to produce into Kinesis Streams
    + */
    +public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
    +
    +   /* AWS region of the stream */
    +   private final String region;
    +
    +   /* Access and secret key of the user */
    +   private final String accessKey;
    +   private final String secretKey;
    +
    +   /* Flag controlling the error behavior of the producer */
    +   private boolean failOnError = false;
    +
    +   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
    +   private String defaultStream;
    +
    +   /* Default partition id. Can be overwritten by the serialization schema 
*/
    +   private String defaultPartition;
    +
    +   /* Schema for turning the OUT type into a byte array. */
    +   private final KinesisSerializationSchema<OUT> schema;
    +
    +   /* Optional custom partitioner */
    +   private KinesisPartitioner<OUT> customPartitioner = null;
    +
    +
    +   // --------------------------- Runtime fields 
---------------------------
    +
    +
    +   /* Our Kinesis instance for each parallel Flink sink */
    +   private transient KinesisProducer producer;
    +
    +   /* Callback handling failures */
    +   private transient FutureCallback<UserRecordResult> callback;
    +
    +   /* Field for async exception */
    +   private transient Throwable thrownException;
    +
    +
    +   // --------------------------- Initialization and configuration  
---------------------------
    +
    +
    +   /**
    +    * Create a new FlinkKinesisProducer.
    +    * This is a constructor supporting Flink's {@see SerializationSchema}.
    +    *
    +    * @param region AWS region of the stream
    +    * @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
    +    * @param secretKey Secret key of the user
    +    * @param schema Serialization schema for the data type
    +    */
    +   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema<OUT> schema) {
    +           // create a simple wrapper for the serialization schema
    +           this(region, accessKey, secretKey, new 
KinesisSerializationSchema<OUT>() {
    +                   @Override
    +                   public ByteBuffer serialize(OUT element) {
    +                           // wrap into ByteBuffer
    +                           return 
ByteBuffer.wrap(schema.serialize(element));
    +                   }
    +                   // use default stream and hash key
    +                   @Override
    +                   public String getTargetStream(OUT element) {
    +                           return null;
    +                   }
    +           });
    +   }
    +
    +   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, KinesisSerializationSchema<OUT> schema) {
    +           this.region = Objects.requireNonNull(region);
    +           this.accessKey = Objects.requireNonNull(accessKey);
    +           this.secretKey = Objects.requireNonNull(secretKey);
    +           
ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
    +           this.schema = schema;
    +   }
    +
    +   /**
    +    * If set to true, the producer will immediately fail with an exception 
on any error.
    +    * Otherwise, the errors are logged and the producer goes on.
    +    *
    +    * @param failOnError Error behavior flag
    +    */
    +   public void setFailOnError(boolean failOnError) {
    +           this.failOnError = failOnError;
    +   }
    +
    +   /**
    +    * Set a default stream name.
    +    * @param defaultStream Name of the default Kinesis stream
    +    */
    +   public void setDefaultStream(String defaultStream) {
    +           this.defaultStream = defaultStream;
    +   }
    +
    +   /**
    +    * Set default partition id
    +    * @param defaultPartition Name of the default partition
    +    */
    +   public void setDefaultPartition(String defaultPartition) {
    +           this.defaultPartition = defaultPartition;
    +   }
    +
    +   public void setCustomPartitioner(KinesisPartitioner<OUT> partitioner) {
    +           Objects.requireNonNull(partitioner);
    +           ClosureCleaner.ensureSerializable(partitioner);
    +           this.customPartitioner = partitioner;
    +   }
    +
    +
    +   // --------------------------- Lifecycle methods 
---------------------------
    +
    +
    +   @Override
    +   public void open(Configuration parameters) throws Exception {
    +           super.open(parameters);
    +
    +           KinesisProducerConfiguration config = new 
KinesisProducerConfiguration();
    +           config.setRegion(this.region);
    +           config.setCredentialsProvider(new StaticCredentialsProvider(new 
BasicAWSCredentials(this.accessKey, this.secretKey)));
    +           producer = new KinesisProducer(config);
    +           callback = new FutureCallback<UserRecordResult>() {
    +                   @Override
    +                   public void onSuccess(UserRecordResult result) {
    +                           if(!result.isSuccessful()) {
    +                                   if(failOnError) {
    +                                           thrownException = new 
RuntimeException("Record was not sent successful");
    +                                   } else {
    +                                           LOG.warn("Record was not sent 
successful");
    +                                   }
    +                           }
    +                   }
    +
    +                   @Override
    +                   public void onFailure(Throwable t) {
    +                           if(failOnError) {
    +                                   thrownException = t;
    +                           } else {
    +                                   LOG.warn("An exception occurred while 
processing a record", t);
    +                           }
    +                   }
    +           };
    +
    +           if(this.customPartitioner != null) {
    +                   
this.customPartitioner.initialize(getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
    +           }
    +
    +           LOG.info("Started Kinesis producer instance for region '{}'", 
this.region);
    +   }
    +
    +   @Override
    +   public void invoke(OUT value) throws Exception {
    +           if(this.producer == null) {
    +                   throw new RuntimeException("Kinesis producer has been 
closed");
    +           }
    +           if(thrownException != null) {
    +                   String errorMessages = "";
    +                   if(thrownException instanceof 
UserRecordFailedException) {
    +                           List<Attempt> attempts = 
((UserRecordFailedException) thrownException).getResult().getAttempts();
    +                           for(Attempt attempt: attempts) {
    +                                   if(attempt.getErrorMessage() != null) {
    +                                           errorMessages += 
attempt.getErrorMessage() +"\n";
    +                                   }
    +                           }
    +                   }
    +                   if(failOnError) {
    +                           throw new RuntimeException("An exception was 
thrown while processing a record: " + errorMessages, thrownException);
    +                   } else {
    +                           LOG.warn("An exception was thrown while 
processing a record: {}", thrownException, errorMessages);
    +                           thrownException = null; // reset
    +                   }
    +           }
    +
    +           String stream = defaultStream;
    +           String partition = defaultPartition;
    +
    +           ByteBuffer serialized = schema.serialize(value);
    +
    +           // maybe set custom stream
    +           String customStream = schema.getTargetStream(value);
    +           if(customStream != null) {
    +                   stream = customStream;
    +           }
    +
    +           String explicitHashkey = null;
    +           // maybe set custom partition
    +           if(customPartitioner != null) {
    +                   partition = customPartitioner.getPartitionId(value);
    +                   explicitHashkey = 
customPartitioner.getExplicitHashKey(value);
    +           }
    +
    +           if(stream == null) {
    +                   if(failOnError) {
    +                           throw new RuntimeException("No target stream 
set");
    +                   } else {
    +                           LOG.warn("No target stream set. Skipping 
record");
    --- End diff --
    
    the schema can set a target stream on each record. So it can happen that a 
user didn't specify a default stream but the schema returns a value stream name 
for 99% of the data.


> Kinesis streaming producer
> --------------------------
>
>                 Key: FLINK-3230
>                 URL: https://issues.apache.org/jira/browse/FLINK-3230
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Robert Metzger
>
> Add a FlinkKinesisProducer for the Flink Kinesis streaming connector. We will 
> be using AWS SDK implementation for code consistency with the 
> FlinkKinesisConsumer.
> The features of FlinkKinesisProducer is rather straightforward:
> 1. Partition put records based on partition key.
> 2. Configurable put mode: Bulk put for higher throughput vs. sequential 
> single record puts. Size of bulk should also be configurable.
> 3. For bulk put, user can also choose to enforce strict ordering of the 
> result with the tradeoff of higher put latency. Ref: 
> https://brandur.org/kinesis-order



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to