David Quigley created BAHIR-283: ----------------------------------- Summary: InfluxDBWriter fails to write the final element in each element Key: BAHIR-283 URL: https://issues.apache.org/jira/browse/BAHIR-283 Project: Bahir Issue Type: Bug Components: Flink Streaming Connectors Affects Versions: Flink-1.0 Reporter: David Quigley Fix For: Flink-Next
{{ /** * This method calls the InfluxDB write API whenever the element list reaches the {@link * #bufferSize}. It keeps track of the latest timestamp of each element. It compares the latest * timestamp with the context.timestamp() and takes the bigger (latest) timestamp. * * @param in incoming data * @param context current Flink context * @see org.apache.flink.api.connector.sink.SinkWriter.Context */ @Override public void write(final IN in, final Context context) throws IOException { if (this.elements.size() == this.bufferSize) { LOG.debug("Buffer size reached preparing to write the elements."); this.writeCurrentElements(); this.elements.clear(); } else { LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size()); this.elements.add(this.schemaSerializer.serialize(in, context)); if (context.timestamp() != null) { this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp()); } } }}} The bug is in this write method. If the number of elements in the buffer is less than the configured buffer size, the current element is added to the buffer. If the number of elements in the buffer is equal to the buffer size, the buffer is flushed and the current element is not added to the next buffer. This results in the current element being dropped. -- This message was sent by Atlassian Jira (v8.3.4#803005)