Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1910#discussion_r60205778
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
    @@ -0,0 +1,272 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.kinesis;
    +
    +
    +import com.amazonaws.auth.BasicAWSCredentials;
    +import com.amazonaws.internal.StaticCredentialsProvider;
    +import com.amazonaws.services.kinesis.producer.Attempt;
    +import com.amazonaws.services.kinesis.producer.KinesisProducer;
    +import 
com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
    +import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
    +import com.amazonaws.services.kinesis.producer.UserRecordResult;
    +import com.google.common.util.concurrent.FutureCallback;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import org.apache.flink.api.java.ClosureCleaner;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.util.serialization.SerializationSchema;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.nio.ByteBuffer;
    +import java.util.List;
    +import java.util.Objects;
    +
    +/**
    + * The FlinkKinesisProducer allows to produce from a Flink DataStream into 
Kinesis.
    + *
    + * @param <OUT> Data type to produce into Kinesis Streams
    + */
    +public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKinesisProducer.class);
    +
    +   /* AWS region of the stream */
    +   private final String region;
    +
    +   /* Access and secret key of the user */
    +   private final String accessKey;
    +   private final String secretKey;
    +
    +   /* Flag controlling the error behavior of the producer */
    +   private boolean failOnError = false;
    +
    +   /* Name of the default stream to produce to. Can be overwritten by the 
serialization schema */
    +   private String defaultStream;
    +
    +   /* Default partition id. Can be overwritten by the serialization schema 
*/
    +   private String defaultPartition;
    +
    +   /* Schema for turning the OUT type into a byte array. */
    +   private final KinesisSerializationSchema<OUT> schema;
    +
    +   /* Optional custom partitioner */
    +   private KinesisPartitioner<OUT> customPartitioner = null;
    +
    +
    +   // --------------------------- Runtime fields 
---------------------------
    +
    +
    +   /* Our Kinesis instance for each parallel Flink sink */
    +   private transient KinesisProducer producer;
    +
    +   /* Callback handling failures */
    +   private transient FutureCallback<UserRecordResult> callback;
    +
    +   /* Field for async exception */
    +   private transient Throwable thrownException;
    +
    +
    +   // --------------------------- Initialization and configuration  
---------------------------
    +
    +
    +   /**
    +    * Create a new FlinkKinesisProducer.
    +    * This is a constructor supporting Flink's {@see SerializationSchema}.
    +    *
    +    * @param region AWS region of the stream
    +    * @param accessKey Access key of a user with permission to access the 
stream (ideally also with access to Cloud Watch)
    +    * @param secretKey Secret key of the user
    +    * @param schema Serialization schema for the data type
    +    */
    +   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, final SerializationSchema<OUT> schema) {
    +           // create a simple wrapper for the serialization schema
    +           this(region, accessKey, secretKey, new 
KinesisSerializationSchema<OUT>() {
    +                   @Override
    +                   public ByteBuffer serialize(OUT element) {
    +                           // wrap into ByteBuffer
    +                           return 
ByteBuffer.wrap(schema.serialize(element));
    +                   }
    +                   // use default stream and hash key
    +                   @Override
    +                   public String getTargetStream(OUT element) {
    +                           return null;
    +                   }
    +           });
    +   }
    +
    +   public FlinkKinesisProducer(String region, String accessKey, String 
secretKey, KinesisSerializationSchema<OUT> schema) {
    +           this.region = Objects.requireNonNull(region);
    +           this.accessKey = Objects.requireNonNull(accessKey);
    +           this.secretKey = Objects.requireNonNull(secretKey);
    +           
ClosureCleaner.ensureSerializable(Objects.requireNonNull(schema));
    +           this.schema = schema;
    --- End diff --
    
    Ah didn't see it in the argument to the ClosureCleaner


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to