[ https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697472#comment-17697472 ]
ASF subversion and git services commented on BAHIR-283: ------------------------------------------------------- Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch refs/heads/master from dave [ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ] [BAHIR-283] Fix dropped elements on InfluxDbSink > InfluxDBWriter fails to write the final element in each element > --------------------------------------------------------------- > > Key: BAHIR-283 > URL: https://issues.apache.org/jira/browse/BAHIR-283 > Project: Bahir > Issue Type: Bug > Components: Flink Streaming Connectors > Affects Versions: Flink-1.0 > Reporter: David Quigley > Priority: Major > Fix For: Flink-1.2.0 > > Original Estimate: 2h > Remaining Estimate: 2h > > {{ /** > * This method calls the InfluxDB write API whenever the element list > reaches the {@link > * #bufferSize}. It keeps track of the latest timestamp of each element. > It compares the latest > * timestamp with the context.timestamp() and takes the bigger (latest) > timestamp. > * > * @param in incoming data > * @param context current Flink context > * @see org.apache.flink.api.connector.sink.SinkWriter.Context > */ > @Override > public void write(final IN in, final Context context) throws IOException { > if (this.elements.size() == this.bufferSize) { > LOG.debug("Buffer size reached preparing to write the elements."); > this.writeCurrentElements(); > this.elements.clear(); > } else { > LOG.trace("Adding elements to buffer. Buffer size: {}", > this.elements.size()); > this.elements.add(this.schemaSerializer.serialize(in, context)); > if (context.timestamp() != null) { > this.lastTimestamp = Math.max(this.lastTimestamp, > context.timestamp()); > } > } > }}} > The bug is in this write method. If the number of elements in the buffer is > less than the configured buffer size, the current element is added to the > buffer. If the number of elements in the buffer is equal to the buffer size, > the buffer is flushed and the current element is not added to the next > buffer. This results in the current element being dropped. -- This message was sent by Atlassian Jira (v8.20.10#820010)