    [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring

    ## What is the purpose of the change
    The `FlinkKinesisProducer` just accepts records and forwards it to a 
`KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.
    Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
grow indefinitely if Flink sends records faster than the KPL can forward them 
to Kinesis.
    One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.
    Currently the only time the queue is flushed is during checkpointing: 
`FlinkKinesisProducer` consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.)
    My proposed solution is to add a config option `queueLimit` to set a 
maximum number of records that may be waiting in the KPL queue. If this limit 
is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records 
while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by 
default, so the behavior is unchanged unless a client explicitly sets the 
value. Setting a »sane« default value is not possible unfortunately, since 
sensible values for the limit depend on the record size (the limit should be 
chosen so that about 10–100MB of records per shard are accumulated before 
flushing, otherwise the maximum Kinesis throughput may not be reached).
    ## Brief change log
    * Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number 
of in-flight records in the Kinesis Producer Library, and enable backpressuring 
if the limit is exceeded
    ## Verifying this change
    This change added tests and can be verified as follows:
    * Added unit test
    * Manually verified the change by running a job that produces to a 2-shard 
Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis 
stream is indeed at maximum capacity).
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes, but backwards compatible (option was added)
      - The serializers: no
      - The runtime per-record code paths (performance sensitive): don't know
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
      - The S3 file system connector: no
    ## Documentation
      - Does this pull request introduce a new feature? yes
      - If yes, how is the feature documented? JavaDocs

