Repository: cassandra Updated Branches: refs/heads/cassandra-3.X ecf05b882 -> 747a62f60 refs/heads/trunk b0fdab4e4 -> ea909c8e6
Make the fanout size for LeveledCompactionStrategy to be configurable Patch by Dikang Gu; reviewed by Marcus Eriksson for CASSANDRA-11550 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9fc14bc5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9fc14bc5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9fc14bc5 Branch: refs/heads/cassandra-3.X Commit: 9fc14bc5a5467da3305668958f9fcd7e5d4dad7a Parents: c6ec31b Author: Dikang Gu <dikan...@gmail.com> Authored: Tue Sep 27 11:48:18 2016 -0700 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Oct 18 08:46:35 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/source/operating/compaction.rst | 6 +++- pylib/cqlshlib/cql3handling.py | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 +++ .../cassandra/db/ColumnFamilyStoreMBean.java | 5 +++ .../compaction/CompactionStrategyManager.java | 17 ++++++++++ .../compaction/LeveledCompactionStrategy.java | 34 +++++++++++++++++++- .../db/compaction/LeveledManifest.java | 21 ++++++++---- .../writers/MajorLeveledCompactionWriter.java | 4 ++- .../cassandra/tools/StandaloneScrubber.java | 4 +-- .../tools/nodetool/stats/TableStatsHolder.java | 2 +- .../LongLeveledCompactionStrategyTest.java | 2 +- .../unit/org/apache/cassandra/SchemaLoader.java | 1 + .../validation/miscellaneous/OverflowTest.java | 2 +- .../db/compaction/CompactionsCQLTest.java | 2 +- 15 files changed, 91 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 32a2dfd..20a64e1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550) * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784) * Improve sum aggregate functions (CASSANDRA-12417) * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/doc/source/operating/compaction.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/compaction.rst b/doc/source/operating/compaction.rst index b0f97c4..0f39000 100644 --- a/doc/source/operating/compaction.rst +++ b/doc/source/operating/compaction.rst @@ -262,7 +262,7 @@ and the attribute to change is ``CompactionParameters`` or ``CompactionParameter syntax for the json version is the same as you would use in an :ref:`ALTER TABLE <alter-table-statement>` statement - for example:: - { 'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb': 123 } + { 'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb': 123, 'fanout_size': 10} The setting is kept until someone executes an :ref:`ALTER TABLE <alter-table-statement>` that touches the compaction settings or restarts the node. @@ -379,6 +379,10 @@ LCS options The target compressed (if using compression) sstable size - the sstables can end up being larger if there are very large partitions on the node. +``fanout_size`` (default: 10) + The target size of levels increases by this fanout_size multiplier. You can reduce the space amplification by tuning + this option. + LCS also support the ``cassandra.disable_stcs_in_l0`` startup option (``-Dcassandra.disable_stcs_in_l0=true``) to avoid doing STCS in L0. http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index f388f4c..8e86801 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -512,6 +512,7 @@ def cf_prop_val_mapkey_completer(ctxt, cass): opts.add('bucket_low') elif csc == 'LeveledCompactionStrategy': opts.add('sstable_size_in_mb') + opts.add('fanout_size') elif csc == 'DateTieredCompactionStrategy': opts.add('base_time_seconds') opts.add('max_sstable_age_days') http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index f89ac3f..0862686 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2481,6 +2481,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return compactionStrategyManager.getSSTableCountPerLevel(); } + public int getLevelFanoutSize() + { + return compactionStrategyManager.getLevelFanoutSize(); + } + public static class ViewFragment { public final List<SSTableReader> sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index ccaacf6..d788e2e 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@ -157,6 +157,11 @@ public interface ColumnFamilyStoreMBean public int[] getSSTableCountPerLevel(); /** + * @return sstable fanout size for level compaction strategy. + */ + public int getLevelFanoutSize(); + + /** * Get the ratio of droppable tombstones to real columns (and non-droppable tombstones) * @return ratio */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 1ad89e9..5679338 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -341,6 +341,23 @@ public class CompactionStrategyManager implements INotificationConsumer return 0; } + public int getLevelFanoutSize() + { + readLock.lock(); + try + { + if (repaired.get(0) instanceof LeveledCompactionStrategy) + { + return ((LeveledCompactionStrategy) repaired.get(0)).getLevelFanoutSize(); + } + } + finally + { + readLock.unlock(); + } + return LeveledCompactionStrategy.DEFAULT_LEVEL_FANOUT_SIZE; + } + public int[] getSSTableCountPerLevel() { readLock.lock(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index 0633251f..f943b19 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -47,15 +47,19 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class); private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; private static final boolean tolerateSstableSize = Boolean.getBoolean(Config.PROPERTY_PREFIX + "tolerate_sstable_size"); + private static final String LEVEL_FANOUT_SIZE_OPTION = "fanout_size"; + public static final int DEFAULT_LEVEL_FANOUT_SIZE = 10; @VisibleForTesting final LeveledManifest manifest; private final int maxSSTableSizeInMB; + private final int levelFanoutSize; public LeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) { super(cfs, options); int configuredMaxSSTableSize = 160; + int configuredLevelFanoutSize = DEFAULT_LEVEL_FANOUT_SIZE; SizeTieredCompactionStrategyOptions localOptions = new SizeTieredCompactionStrategyOptions(options); if (options != null) { @@ -72,10 +76,16 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy configuredMaxSSTableSize, cfs.name, cfs.getColumnFamilyName()); } } + + if (options.containsKey(LEVEL_FANOUT_SIZE_OPTION)) + { + configuredLevelFanoutSize = Integer.parseInt(options.get(LEVEL_FANOUT_SIZE_OPTION)); + } } maxSSTableSizeInMB = configuredMaxSSTableSize; + levelFanoutSize = configuredLevelFanoutSize; - manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, localOptions); + manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, this.levelFanoutSize, localOptions); logger.trace("Created {}", manifest); } @@ -243,6 +253,11 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy return maxSSTableSizeInMB * 1024L * 1024L; } + public int getLevelFanoutSize() + { + return levelFanoutSize; + } + public ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) { Set<SSTableReader>[] sstablesPerLevel = manifest.getSStablesPerLevelSnapshot(); @@ -531,6 +546,23 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy uncheckedOptions.remove(SSTABLE_SIZE_OPTION); + // Validate the fanout_size option + String levelFanoutSize = options.containsKey(LEVEL_FANOUT_SIZE_OPTION) ? options.get(LEVEL_FANOUT_SIZE_OPTION) : String.valueOf(DEFAULT_LEVEL_FANOUT_SIZE); + try + { + int fanoutSize = Integer.parseInt(levelFanoutSize); + if (fanoutSize < 1) + { + throw new ConfigurationException(String.format("%s must be larger than 0, but was %s", LEVEL_FANOUT_SIZE_OPTION, fanoutSize)); + } + } + catch (NumberFormatException ex) + { + throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", size, LEVEL_FANOUT_SIZE_OPTION), ex); + } + + uncheckedOptions.remove(LEVEL_FANOUT_SIZE_OPTION); + uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); return uncheckedOptions; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java index 5a8f153..3d118de 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java @@ -70,12 +70,14 @@ public class LeveledManifest private final long maxSSTableSizeInBytes; private final SizeTieredCompactionStrategyOptions options; private final int [] compactionCounter; + private final int levelFanoutSize; - LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options) + LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, int fanoutSize, SizeTieredCompactionStrategyOptions options) { this.cfs = cfs; this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024L * 1024L; this.options = options; + this.levelFanoutSize = fanoutSize; generations = new List[MAX_LEVEL_COUNT]; lastCompactedKeys = new PartitionPosition[MAX_LEVEL_COUNT]; @@ -87,14 +89,14 @@ public class LeveledManifest compactionCounter = new int[MAX_LEVEL_COUNT]; } - public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables) + public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, int fanoutSize, List<SSTableReader> sstables) { - return create(cfs, maxSSTableSize, sstables, new SizeTieredCompactionStrategyOptions()); + return create(cfs, maxSSTableSize, fanoutSize, sstables, new SizeTieredCompactionStrategyOptions()); } - public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable<SSTableReader> sstables, SizeTieredCompactionStrategyOptions options) + public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, int fanoutSize, Iterable<SSTableReader> sstables, SizeTieredCompactionStrategyOptions options) { - LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, options); + LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, fanoutSize, options); // ensure all SSTables are in the manifest for (SSTableReader ssTableReader : sstables) @@ -282,11 +284,16 @@ public class LeveledManifest return builder.toString(); } - public static long maxBytesForLevel(int level, long maxSSTableSizeInBytes) + public long maxBytesForLevel(int level, long maxSSTableSizeInBytes) + { + return maxBytesForLevel(level, levelFanoutSize, maxSSTableSizeInBytes); + } + + public static long maxBytesForLevel(int level, int levelFanoutSize, long maxSSTableSizeInBytes) { if (level == 0) return 4L * maxSSTableSizeInBytes; - double bytes = Math.pow(10, level) * maxSSTableSizeInBytes; + double bytes = Math.pow(levelFanoutSize, level) * maxSSTableSizeInBytes; if (bytes > Long.MAX_VALUE) throw new RuntimeException("At most " + Long.MAX_VALUE + " bytes may be in a compaction level; your maxSSTableSize must be absurdly high to compute " + bytes); return (long) bytes; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java index 6cccfcb..0beb505 100644 --- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java +++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java @@ -41,6 +41,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter private int sstablesWritten = 0; private final long keysPerSSTable; private Directories.DataDirectory sstableDirectory; + private final int levelFanoutSize; public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Directories directories, @@ -73,6 +74,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter { super(cfs, directories, txn, nonExpiredSSTables, keepOriginals); this.maxSSTableSize = maxSSTableSize; + this.levelFanoutSize = cfs.getLevelFanoutSize(); long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize); keysPerSSTable = estimatedTotalKeys / estimatedSSTables; } @@ -87,7 +89,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter if (totalWrittenInCurrentWriter > maxSSTableSize) { totalWrittenInLevel += totalWrittenInCurrentWriter; - if (totalWrittenInLevel > LeveledManifest.maxBytesForLevel(currentLevel, maxSSTableSize)) + if (totalWrittenInLevel > LeveledManifest.maxBytesForLevel(currentLevel, levelFanoutSize, maxSSTableSize)) { totalWrittenInLevel = 0; currentLevel++; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/tools/StandaloneScrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java index 8574179..54b340e 100644 --- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java +++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java @@ -177,12 +177,12 @@ public class StandaloneScrubber List<SSTableReader> repaired = Lists.newArrayList(Iterables.filter(sstables, repairedPredicate)); List<SSTableReader> unRepaired = Lists.newArrayList(Iterables.filter(sstables, Predicates.not(repairedPredicate))); - LeveledManifest repairedManifest = LeveledManifest.create(cfs, maxSizeInMB, repaired); + LeveledManifest repairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), repaired); for (int i = 1; i < repairedManifest.getLevelCount(); i++) { repairedManifest.repairOverlappingSSTables(i); } - LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, maxSizeInMB, unRepaired); + LeveledManifest unRepairedManifest = LeveledManifest.create(cfs, maxSizeInMB, cfs.getLevelFanoutSize(), unRepaired); for (int i = 1; i < unRepairedManifest.getLevelCount(); i++) { unRepairedManifest.repairOverlappingSSTables(i); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java index a785528..150baf3 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java +++ b/src/java/org/apache/cassandra/tools/nodetool/stats/TableStatsHolder.java @@ -173,7 +173,7 @@ public class TableStatsHolder implements StatsHolder int count = leveledSStables[level]; long maxCount = 4L; // for L0 if (level > 0) - maxCount = (long) Math.pow(10, level); + maxCount = (long) Math.pow(table.getLevelFanoutSize(), level); // show max threshold for level when exceeded statsTable.sstablesInEachLevel.add(count + ((count > maxCount) ? "/" + maxCount : "")); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index f8f5a7c..cf4aba2 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -129,7 +129,7 @@ public class LongLeveledCompactionStrategyTest { List<SSTableReader> sstables = manifest.getLevel(level); // score check - assert (double) SSTableReader.getTotalBytes(sstables) / LeveledManifest.maxBytesForLevel(level, 1 * 1024 * 1024) < 1.00; + assert (double) SSTableReader.getTotalBytes(sstables) / manifest.maxBytesForLevel(level, 1 * 1024 * 1024) < 1.00; // overlap check for levels greater than 0 for (SSTableReader sstable : sstables) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index d6819e1..2bf4805 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -112,6 +112,7 @@ public class SchemaLoader compactionOptions.put("tombstone_compaction_interval", "1"); Map<String, String> leveledOptions = new HashMap<String, String>(); leveledOptions.put("sstable_size_in_mb", "1"); + leveledOptions.put("fanout_size", "5"); // Keyspace 1 schema.add(KeyspaceMetadata.create(ks1, http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java index 49bfe88..6fdedc2 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/miscellaneous/OverflowTest.java @@ -111,7 +111,7 @@ public class OverflowTest extends CQLTester + "AND dclocal_read_repair_chance = 0.5 " + "AND gc_grace_seconds = 4 " + "AND bloom_filter_fp_chance = 0.01 " - + "AND compaction = { 'class' : 'LeveledCompactionStrategy', 'sstable_size_in_mb' : 10 } " + + "AND compaction = { 'class' : 'LeveledCompactionStrategy', 'sstable_size_in_mb' : 10, 'fanout_size' : 5 } " + "AND compression = { 'enabled': false } " + "AND caching = { 'keys': 'ALL', 'rows_per_partition': 'ALL' }"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9fc14bc5/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index ca206b3..90e8380 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -50,7 +50,7 @@ public class CompactionsCQLTest extends CQLTester @Test public void testTriggerMinorCompactionLCS() throws Throwable { - createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1};"); + createTable("CREATE TABLE %s (id text PRIMARY KEY) WITH compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1, 'fanout_size':5};"); assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); execute("insert into %s (id) values ('1')"); flush();