This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 747e0bd1e4f1a42ffee98ca5ab85711059521fa6 Author: a181321 <anton.roskv...@volvo.com> AuthorDate: Thu Mar 23 23:00:17 2023 +0100 ARTEMIS-4215 JournalFlush might never happen when journal-sync-* is false --- .../artemis/core/io/buffer/TimedBuffer.java | 16 +- tests/soak-tests/pom.xml | 25 ++ .../main/resources/servers/interruptjf/broker.xml | 262 +++++++++++++++++++++ .../resources/servers/interruptjf/management.xml | 52 ++++ .../soak/interrupt/JournalFlushInterruptTest.java | 89 +++++++ .../unit/core/journal/impl/TimedBufferTest.java | 32 ++- 6 files changed, 462 insertions(+), 14 deletions(-) diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java index 4aa69aeb23..fd3dbf8b23 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/buffer/TimedBuffer.java @@ -271,9 +271,10 @@ public final class TimedBuffer extends CriticalComponentImpl { if (sync) { pendingSync = true; - - startSpin(); } + + startSpin(); + } } } @@ -293,9 +294,10 @@ public final class TimedBuffer extends CriticalComponentImpl { if (sync) { pendingSync = true; - - startSpin(); } + + startSpin(); + } } } @@ -436,12 +438,12 @@ public final class TimedBuffer extends CriticalComponentImpl { boolean useSleep = true; while (!closed) { - // We flush on the timer if there are pending syncs there and we've waited at least one + // We flush on the timer if there are pending syncs there or we've waited at least one // timeout since the time of the last flush. // Effectively flushing "resets" the timer // On the timeout verification, notice that we ignore the timeout check if we are using sleep - if (pendingSync) { + if (pendingSync || System.nanoTime() - lastFlushTime > timeout) { if (useSleep) { // if using sleep, we will always flush lastFlushTime = System.nanoTime(); @@ -458,7 +460,7 @@ public final class TimedBuffer extends CriticalComponentImpl { useSleep = sleepIfPossible(timeToSleep); } } - } else if (bufferObserver != null && System.nanoTime() - lastFlushTime > timeout) { + } else if (bufferObserver != null) { lastFlushTime = System.nanoTime(); // if not using flush we will spin and do the time checks manually flush(); diff --git a/tests/soak-tests/pom.xml b/tests/soak-tests/pom.xml index 6d6ea93d82..ace4bbcf1c 100644 --- a/tests/soak-tests/pom.xml +++ b/tests/soak-tests/pom.xml @@ -243,6 +243,31 @@ </args> </configuration> </execution> + <!-- Used on JournalFlushInterruptTest --> + <execution> + <phase>test-compile</phase> + <id>create-interruptjf</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <role>amq</role> + <user>artemis</user> + <password>artemis</password> + <allowAnonymous>true</allowAnonymous> + <noWeb>false</noWeb> + <instance>${basedir}/target/interruptjf</instance> + <configuration>${basedir}/target/classes/servers/interruptjf</configuration> + <args> + <arg>--java-options</arg> + <arg>-Djava.rmi.server.hostname=localhost</arg> + <arg>--queues</arg> + <arg>JournalFlushInterruptTest</arg> + <arg>--name</arg> + <arg>interruptjf</arg> + </args> + </configuration> + </execution> <!-- Used on LargeMessageInterruptTest --> <execution> <phase>test-compile</phase> diff --git a/tests/soak-tests/src/main/resources/servers/interruptjf/broker.xml b/tests/soak-tests/src/main/resources/servers/interruptjf/broker.xml new file mode 100644 index 0000000000..b4ee73eb94 --- /dev/null +++ b/tests/soak-tests/src/main/resources/servers/interruptjf/broker.xml @@ -0,0 +1,262 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:xi="http://www.w3.org/2001/XInclude" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq:core "> + + <name>jfinterrupt</name> + + <journal-sync-transactional>false</journal-sync-transactional> + <journal-sync-non-transactional>false</journal-sync-non-transactional> + <journal-buffer-timeout>10000</journal-buffer-timeout> + + <persistence-enabled>true</persistence-enabled> + + <!-- this could be ASYNCIO, MAPPED, NIO + ASYNCIO: Linux Libaio + MAPPED: mmap files + NIO: Plain Java Files + --> + <journal-type>ASYNCIO</journal-type> + + <paging-directory>./data/paging</paging-directory> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/large-messages</large-messages-directory> + + + <!-- if you want to retain your journal uncomment this following configuration. + + This will allow your system to keep 7 days of your data, up to 10G. Tweak it accordingly to your use case and capacity. + + it is recommended to use a separate storage unit from the journal for performance considerations. + + <journal-retention-directory period="7" unit="DAYS" storage-limit="10G">data/retention</journal-retention-directory> + + You can also enable retention by using the argument journal-retention on the `artemis create` command --> + + + + <journal-datasync>true</journal-datasync> + + <journal-min-files>2</journal-min-files> + + <journal-pool-files>10</journal-pool-files> + + <journal-device-block-size>4096</journal-device-block-size> + + <journal-file-size>10M</journal-file-size> + <!-- + You can verify the network health of a particular NIC by specifying the <network-check-NIC> element. + <network-check-NIC>theNicName</network-check-NIC> + --> + + <!-- + Use this to use an HTTP server to validate the network + <network-check-URL-list>http://www.apache.org</network-check-URL-list> --> + + <!-- <network-check-period>10000</network-check-period> --> + <!-- <network-check-timeout>1000</network-check-timeout> --> + + <!-- this is a comma separated list, no spaces, just DNS or IPs + it should accept IPV6 + + Warning: Make sure you understand your network topology as this is meant to validate if your network is valid. + Using IPs that could eventually disappear or be partially visible may defeat the purpose. + You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running --> + <!-- <network-check-list>10.0.0.1</network-check-list> --> + + <!-- use this to customize the ping used for ipv4 addresses --> + <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> --> + + <!-- use this to customize the ping used for ipv6 addresses --> + <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> --> + + + + + <!-- how often we are looking for how many bytes are being used on the disk in ms --> + <disk-scan-period>5000</disk-scan-period> + + <!-- once the disk hits this limit the system will block, or close the connection in certain protocols + that won't support flow control. --> + <max-disk-usage>90</max-disk-usage> + + <!-- should the broker detect dead locks and other issues --> + <critical-analyzer>true</critical-analyzer> + + <critical-analyzer-timeout>120000</critical-analyzer-timeout> + + <critical-analyzer-check-period>60000</critical-analyzer-check-period> + + <critical-analyzer-policy>HALT</critical-analyzer-policy> + + + + <!-- the system will enter into page mode once you hit this limit. This is an estimate in bytes of how much the messages are using in memory + + The system will use half of the available memory (-Xmx) by default for the global-max-size. + You may specify a different value here if you need to customize it to your needs. + + <global-max-size>100Mb</global-max-size> --> + + <!-- the maximum number of messages accepted before entering full address mode. + if global-max-size is specified the full address mode will be specified by whatever hits it first. --> + <global-max-messages>-1</global-max-messages> + + <acceptors> + + <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it --> + <!-- amqpCredits: The number of credits sent to AMQP producers --> + <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark --> + <!-- amqpDuplicateDetection: If you are not using duplicate detection, set this to false + as duplicate detection requires applicationProperties to be parsed on the server. --> + <!-- amqpMinLargeMessageSize: Determines how many bytes are considered large, so we start using files to hold their data. + default: 102400, -1 would mean to disable large mesasge control --> + + <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add + "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url. + See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. --> + + + <!-- Acceptor for every supported protocol --> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor> + + <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.--> + <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor> + + <!-- STOMP Acceptor. --> + <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor> + + <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP for legacy HornetQ clients. --> + <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor> + + <!-- MQTT Acceptor --> + <acceptor name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor> + + </acceptors> + + + <security-settings> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="amq"/> + <permission type="deleteNonDurableQueue" roles="amq"/> + <permission type="createDurableQueue" roles="amq"/> + <permission type="deleteDurableQueue" roles="amq"/> + <permission type="createAddress" roles="amq"/> + <permission type="deleteAddress" roles="amq"/> + <permission type="consume" roles="amq"/> + <permission type="browse" roles="amq"/> + <permission type="send" roles="amq"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="amq"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + + <!-- if max-size-bytes and max-size-messages were both enabled, the system will enter into paging + based on the first attribute to hits the maximum value --> + <!-- limit for the address in bytes, -1 means unlimited --> + <max-size-bytes>-1</max-size-bytes> + + <!-- limit for the address in messages, -1 means unlimited --> + <max-size-messages>1000</max-size-messages> + + <!-- the size of each file on paging. Notice we keep files in memory while they are in use. + Lower this setting if you have too many queues in memory. --> + <page-size-bytes>10M</page-size-bytes> + + <!-- limit how many messages are read from paging into the Queue. --> + <max-read-page-messages>-1</max-read-page-messages> + + <!-- limit how much memory is read from paging into the Queue. --> + <max-read-page-bytes>20M</max-read-page-bytes> + + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-delete-queues>false</auto-delete-queues> + <auto-delete-addresses>false</auto-delete-addresses> + </address-setting> + </address-settings> + + <addresses> + <address name="DLQ"> + <anycast> + <queue name="DLQ" /> + </anycast> + </address> + <address name="ExpiryQueue"> + <anycast> + <queue name="ExpiryQueue" /> + </anycast> + </address> + <address name="JournalFlushInterruptTest"> + <anycast> + <queue name="JournalFlushInterruptTest" /> + </anycast> + </address> + + </addresses> + + + <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events + <broker-plugins> + <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin"> + <property key="LOG_ALL_EVENTS" value="true"/> + <property key="LOG_CONNECTION_EVENTS" value="true"/> + <property key="LOG_SESSION_EVENTS" value="true"/> + <property key="LOG_CONSUMER_EVENTS" value="true"/> + <property key="LOG_DELIVERING_EVENTS" value="true"/> + <property key="LOG_SENDING_EVENTS" value="true"/> + <property key="LOG_INTERNAL_EVENTS" value="true"/> + </broker-plugin> + </broker-plugins> + --> + + </core> +</configuration> diff --git a/tests/soak-tests/src/main/resources/servers/interruptjf/management.xml b/tests/soak-tests/src/main/resources/servers/interruptjf/management.xml new file mode 100644 index 0000000000..1d38e28ac9 --- /dev/null +++ b/tests/soak-tests/src/main/resources/servers/interruptjf/management.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<management-context xmlns="http://activemq.apache.org/schema"> + <connector connector-port="1099"/> + <authorisation> + <allowlist> + <entry domain="hawtio"/> + </allowlist> + <default-access> + <access method="list*" roles="amq"/> + <access method="get*" roles="amq"/> + <access method="is*" roles="amq"/> + <access method="set*" roles="amq"/> + <access method="*" roles="amq"/> + </default-access> + <role-access> + <match domain="org.apache.activemq.artemis"> + <access method="list*" roles="amq"/> + <access method="get*" roles="amq"/> + <access method="is*" roles="amq"/> + <access method="set*" roles="amq"/> + <!-- Note count and browse are need to access the browse tab in the console--> + <access method="browse*" roles="amq"/> + <access method="count*" roles="amq"/> + <access method="*" roles="amq"/> + </match> + <!--example of how to configure a specific object--> + <!--<match domain="org.apache.activemq.artemis" key="subcomponent=queues"> + <access method="list*" roles="view,update,amq"/> + <access method="get*" roles="view,update,amq"/> + <access method="is*" roles="view,update,amq"/> + <access method="set*" roles="update,amq"/> + <access method="*" roles="amq"/> + </match>--> + </role-access> + </authorisation> +</management-context> \ No newline at end of file diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interrupt/JournalFlushInterruptTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interrupt/JournalFlushInterruptTest.java new file mode 100644 index 0000000000..8f00dd072f --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/interrupt/JournalFlushInterruptTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.interrupt; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JournalFlushInterruptTest extends SoakTestBase { + public static final String SERVER_NAME_0 = "interruptjf"; + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_SERVER_PORT_0 = 1099; + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi"; + static ObjectNameBuilder liveNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "jfinterrupt", true); + Process serverProcess; + + @Before + public void before() throws Exception { + cleanupData(SERVER_NAME_0); + serverProcess = startServer(SERVER_NAME_0, 0, 30000); + disableCheckThread(); + } + + private void killProcess(Process process) throws Exception { + Runtime.getRuntime().exec("kill -9 " + process.pid()); + } + + @Test + public void testInterruptJF() throws Throwable { + final ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616"); + final String queueName = "JournalFlushInterruptTest"; + final int messageCount = 100; + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(queueName)); + for (int i = 0; i < messageCount; i++) { + producer.send(session.createTextMessage("MessageCount: " + i)); + } + } + + QueueControl queueControl = getQueueControl(liveURI, liveNameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000); + + Assert.assertEquals(messageCount, queueControl.getMessageCount()); + Thread.sleep(100); + + killProcess(serverProcess); + Assert.assertTrue(serverProcess.waitFor(1, TimeUnit.MINUTES)); + serverProcess = startServer(SERVER_NAME_0, 0, 0); + + waitForServerToStart("tcp://localhost:61616", "artemis", "artemis", 5000); + queueControl = getQueueControl(liveURI, liveNameBuilder, queueName, queueName, RoutingType.ANYCAST, 5000); + + Assert.assertEquals(messageCount, queueControl.getMessageCount()); + + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java index 38ff847be5..157dc8e268 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/journal/impl/TimedBufferTest.java @@ -465,20 +465,38 @@ public class TimedBufferTest extends ActiveMQTestBase { timedBuffer.setObserver(new TestObserver()); int x = 0; + byte[] bytes; + ActiveMQBuffer buff; - byte[] bytes = new byte[10]; + for (int i = 0; i < 3; i++) { + bytes = new byte[10]; + for (int j = 0; j < 10; j++) { + bytes[j] = ActiveMQTestBase.getSamplebyte(x++); + } + + buff = ActiveMQBuffers.wrappedBuffer(bytes); + + timedBuffer.checkSize(10); + timedBuffer.addBytes(buff, false, dummyCallback); + } + + Thread.sleep(200); + int count = flushTimes.get(); + Assert.assertTrue(count < 3); + + bytes = new byte[10]; for (int j = 0; j < 10; j++) { bytes[j] = ActiveMQTestBase.getSamplebyte(x++); } - ActiveMQBuffer buff = ActiveMQBuffers.wrappedBuffer(bytes); + buff = ActiveMQBuffers.wrappedBuffer(bytes); timedBuffer.checkSize(10); timedBuffer.addBytes(buff, false, dummyCallback); Thread.sleep(200); - Assert.assertEquals(0, flushTimes.get()); + Assert.assertEquals(count + 1, flushTimes.get()); bytes = new byte[10]; for (int j = 0; j < 10; j++) { @@ -492,17 +510,17 @@ public class TimedBufferTest extends ActiveMQTestBase { Thread.sleep(500); - Assert.assertEquals(1, flushTimes.get()); + Assert.assertEquals(count + 2, flushTimes.get()); ByteBuffer flushedBuffer = buffers.get(0); - Assert.assertEquals(20, flushedBuffer.limit()); + Assert.assertEquals(30, flushedBuffer.limit()); - Assert.assertEquals(20, flushedBuffer.capacity()); + Assert.assertEquals(30, flushedBuffer.capacity()); flushedBuffer.rewind(); - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 30; i++) { Assert.assertEquals(ActiveMQTestBase.getSamplebyte(i), flushedBuffer.get()); } } finally {