[ 
https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697472#comment-17697472
 ] 

ASF subversion and git services commented on BAHIR-283:
-------------------------------------------------------

Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch 
refs/heads/master from dave
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ]

[BAHIR-283] Fix dropped elements on InfluxDbSink



> InfluxDBWriter fails to write the final element in each element
> ---------------------------------------------------------------
>
>                 Key: BAHIR-283
>                 URL: https://issues.apache.org/jira/browse/BAHIR-283
>             Project: Bahir
>          Issue Type: Bug
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: David Quigley
>            Priority: Major
>             Fix For: Flink-1.2.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> {{    /**
>      * This method calls the InfluxDB write API whenever the element list 
> reaches the {@link
>      * #bufferSize}. It keeps track of the latest timestamp of each element. 
> It compares the latest
>      * timestamp with the context.timestamp() and takes the bigger (latest) 
> timestamp.
>      *
>      * @param in incoming data
>      * @param context current Flink context
>      * @see org.apache.flink.api.connector.sink.SinkWriter.Context
>      */
>     @Override
>     public void write(final IN in, final Context context) throws IOException {
>         if (this.elements.size() == this.bufferSize) {
>             LOG.debug("Buffer size reached preparing to write the elements.");
>             this.writeCurrentElements();
>             this.elements.clear();
>         } else {
>             LOG.trace("Adding elements to buffer. Buffer size: {}", 
> this.elements.size());
>             this.elements.add(this.schemaSerializer.serialize(in, context));
>             if (context.timestamp() != null) {
>                 this.lastTimestamp = Math.max(this.lastTimestamp, 
> context.timestamp());
>             }
>         }
>     }}}
> The bug is in this write method. If the number of elements in the buffer is 
> less than the configured buffer size, the current element is added to the 
> buffer. If the number of elements in the buffer is equal to the buffer size, 
> the buffer is flushed and the current element is not added to the next 
> buffer. This results in the current element being dropped.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to