This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new c984536ea98 SOLR-16608: Ability to compress the collection state
(#1267)
c984536ea98 is described below
commit c984536ea9800dc70e27eed6671347764d43ac76
Author: Justin Sweeney <[email protected]>
AuthorDate: Mon Feb 6 00:43:07 2023 -0700
SOLR-16608: Ability to compress the collection state (#1267)
---
solr/CHANGES.txt | 2 +
.../src/java/org/apache/solr/cloud/Overseer.java | 32 +++++-
.../java/org/apache/solr/cloud/ZkController.java | 11 ++-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 18 +++-
.../src/java/org/apache/solr/core/CloudConfig.java | 37 ++++++-
.../java/org/apache/solr/core/SolrXmlConfig.java | 6 ++
.../solr/cloud/overseer/ZkStateReaderTest.java | 88 ++++++++++++++++-
.../solr/cloud/overseer/ZkStateWriterTest.java | 107 ++++++++++++++++++++-
solr/server/solr/solr.xml | 2 +
.../pages/configuring-solr-xml.adoc | 20 ++++
.../org/apache/solr/common/cloud/SolrZkClient.java | 45 +++++++++
.../org/apache/solr/common/util/Compressor.java | 48 +++++++++
.../apache/solr/common/util/ZLibCompressor.java | 102 ++++++++++++++++++++
.../cloud/SolrZkClientCompressedDataTest.java | 89 +++++++++++++++++
.../solr/common/util/ZLibCompressorTest.java | 77 +++++++++++++++
15 files changed, 669 insertions(+), 15 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index b62a0338362..ed9672fe84b 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -51,6 +51,8 @@ New Features
* SOLR-16596: Learning To Rank - Added support for null feature values in
multiple additive trees models (Anna Ruggero via Alessandro Benedetti)
+* SOLR-16608: Ability to compress state.json in Zookeeper (Justin Sweeney via
noble)
+
Improvements
---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index db633e315bc..1a7295071de 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -53,6 +53,7 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrCloseable;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
@@ -61,11 +62,13 @@ import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrInfoBean;
@@ -199,9 +202,18 @@ public class Overseer implements SolrCloseable {
private SolrMetricsContext clusterStateUpdaterMetricContext;
+ private final int minStateByteLenForCompression;
+
+ private final Compressor compressor;
+
private boolean isClosed = false;
- public ClusterStateUpdater(final ZkStateReader reader, final String myId,
Stats zkStats) {
+ public ClusterStateUpdater(
+ final ZkStateReader reader,
+ final String myId,
+ Stats zkStats,
+ int minStateByteLenForCompression,
+ Compressor compressor) {
this.zkClient = reader.getZkClient();
this.zkStats = zkStats;
this.stateUpdateQueue = getStateUpdateQueue(zkStats);
@@ -211,6 +223,8 @@ public class Overseer implements SolrCloseable {
this.completedMap = getCompletedMap(zkClient);
this.myId = myId;
this.reader = reader;
+ this.minStateByteLenForCompression = minStateByteLenForCompression;
+ this.compressor = compressor;
clusterStateUpdaterMetricContext =
solrMetricsContext.getChildContext(this);
clusterStateUpdaterMetricContext.gauge(
@@ -263,7 +277,8 @@ public class Overseer implements SolrCloseable {
try {
reader.forciblyRefreshAllClusterStateSlow();
clusterState = reader.getClusterState();
- zkStateWriter = new ZkStateWriter(reader, stats);
+ zkStateWriter =
+ new ZkStateWriter(reader, stats,
minStateByteLenForCompression, compressor);
refreshClusterState = false;
// if there were any errors while processing
@@ -728,9 +743,20 @@ public class Overseer implements SolrCloseable {
createOverseerNode(reader.getZkClient());
// launch cluster state updater thread
ThreadGroup tg = new ThreadGroup("Overseer state updater.");
+ String stateCompressionProviderClass = config.getStateCompressorClass();
+ Compressor compressor =
+ StringUtils.isEmpty(stateCompressionProviderClass)
+ ? new ZLibCompressor()
+ : zkController
+ .getCoreContainer()
+ .getResourceLoader()
+ .newInstance(stateCompressionProviderClass, Compressor.class);
updaterThread =
new OverseerThread(
- tg, new ClusterStateUpdater(reader, id, stats),
"OverseerStateUpdate-" + id);
+ tg,
+ new ClusterStateUpdater(
+ reader, id, stats, config.getMinStateByteLenForCompression(),
compressor),
+ "OverseerStateUpdate-" + id);
updaterThread.setDaemon(true);
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation
process.");
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 37524adf5ff..cf888851b1c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -103,6 +103,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -110,6 +111,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.URLUtil;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
@@ -371,6 +373,12 @@ public class ZkController implements Closeable {
strat.setZkCredentialsToAddAutomatically(zkCredentialsProvider);
addOnReconnectListener(getConfigDirListener());
+ String stateCompressionProviderClass =
cloudConfig.getStateCompressorClass();
+ Compressor compressor =
+ StringUtils.isEmpty(stateCompressionProviderClass)
+ ? new ZLibCompressor()
+ :
cc.getResourceLoader().newInstance(stateCompressionProviderClass,
Compressor.class);
+
zkClient =
new SolrZkClient(
zkServerAddress,
@@ -388,7 +396,8 @@ public class ZkController implements Closeable {
markAllAsNotLeader(descriptorsSupplier);
},
zkACLProvider,
- cc::isShutDown);
+ cc::isShutDown,
+ compressor);
// Refuse to start if ZK has a non empty /clusterstate.json
checkNoOldClusterstate(zkClient);
diff --git
a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 6aa561632cd..9cd2089e881 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -33,6 +33,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
import org.apache.solr.common.cloud.PerReplicaStatesOps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -78,12 +79,24 @@ public class ZkStateWriter {
*/
protected boolean invalidState = false;
- public ZkStateWriter(ZkStateReader zkStateReader, Stats stats) {
+ // If the state.json is greater than this many bytes and compression is
enabled in solr.xml, then
+ // the data will be compressed
+ protected int minStateByteLenForCompression;
+
+ protected Compressor compressor;
+
+ public ZkStateWriter(
+ ZkStateReader zkStateReader,
+ Stats stats,
+ int minStateByteLenForCompression,
+ Compressor compressor) {
assert zkStateReader != null;
this.reader = zkStateReader;
this.stats = stats;
this.clusterState = zkStateReader.getClusterState();
+ this.minStateByteLenForCompression = minStateByteLenForCompression;
+ this.compressor = compressor;
}
/**
@@ -273,6 +286,9 @@ public class ZkStateWriter {
reader.getZkClient().clean(path);
} else {
byte[] data = Utils.toJSON(singletonMap(c.getName(), c));
+ if (minStateByteLenForCompression > -1 && data.length >
minStateByteLenForCompression) {
+ data = compressor.compressBytes(data);
+ }
if (reader.getZkClient().exists(path, true)) {
if (log.isDebugEnabled()) {
log.debug("going to update_collection {} version: {}", path,
c.getZNodeVersion());
diff --git a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
index f7a7d605980..4cb4026579b 100644
--- a/solr/core/src/java/org/apache/solr/core/CloudConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/CloudConfig.java
@@ -54,6 +54,10 @@ public class CloudConfig {
private final boolean useDistributedCollectionConfigSetExecution;
+ private final int minStateByteLenForCompression;
+
+ private final String stateCompressorClass;
+
CloudConfig(
String zkHost,
int zkClientTimeout,
@@ -71,7 +75,9 @@ public class CloudConfig {
String pkiHandlerPrivateKeyPath,
String pkiHandlerPublicKeyPath,
boolean useDistributedClusterStateUpdates,
- boolean useDistributedCollectionConfigSetExecution) {
+ boolean useDistributedCollectionConfigSetExecution,
+ int minStateByteLenForCompression,
+ String stateCompressorClass) {
this.zkHost = zkHost;
this.zkClientTimeout = zkClientTimeout;
this.hostPort = hostPort;
@@ -89,6 +95,8 @@ public class CloudConfig {
this.pkiHandlerPublicKeyPath = pkiHandlerPublicKeyPath;
this.useDistributedClusterStateUpdates = useDistributedClusterStateUpdates;
this.useDistributedCollectionConfigSetExecution =
useDistributedCollectionConfigSetExecution;
+ this.minStateByteLenForCompression = minStateByteLenForCompression;
+ this.stateCompressorClass = stateCompressorClass;
if (useDistributedCollectionConfigSetExecution &&
!useDistributedClusterStateUpdates) {
throw new SolrException(
@@ -173,6 +181,14 @@ public class CloudConfig {
return useDistributedCollectionConfigSetExecution;
}
+ public int getMinStateByteLenForCompression() {
+ return minStateByteLenForCompression;
+ }
+
+ public String getStateCompressorClass() {
+ return stateCompressorClass;
+ }
+
public static class CloudConfigBuilder {
private static final int DEFAULT_ZK_CLIENT_TIMEOUT = 45000;
@@ -180,6 +196,8 @@ public class CloudConfig {
private static final int DEFAULT_LEADER_CONFLICT_RESOLVE_WAIT = 180000;
private static final int DEFAULT_CREATE_COLLECTION_ACTIVE_WAIT = 45; // 45
seconds
private static final boolean DEFAULT_CREATE_COLLECTION_CHECK_LEADER_ACTIVE
= false;
+ private static final int DEFAULT_MINIMUM_STATE_SIZE_FOR_COMPRESSION =
+ -1; // By default compression for state is disabled
private String zkHost;
private int zkClientTimeout = Integer.getInteger("zkClientTimeout",
DEFAULT_ZK_CLIENT_TIMEOUT);
@@ -199,6 +217,9 @@ public class CloudConfig {
private String pkiHandlerPublicKeyPath;
private boolean useDistributedClusterStateUpdates = false;
private boolean useDistributedCollectionConfigSetExecution = false;
+ private int minStateByteLenForCompression =
DEFAULT_MINIMUM_STATE_SIZE_FOR_COMPRESSION;
+
+ private String stateCompressorClass;
public CloudConfigBuilder(String hostName, int hostPort) {
this(hostName, hostPort, null);
@@ -286,6 +307,16 @@ public class CloudConfig {
return this;
}
+ public CloudConfigBuilder setMinStateByteLenForCompression(int
minStateByteLenForCompression) {
+ this.minStateByteLenForCompression = minStateByteLenForCompression;
+ return this;
+ }
+
+ public CloudConfigBuilder setStateCompressorClass(String
stateCompressorClass) {
+ this.stateCompressorClass = stateCompressorClass;
+ return this;
+ }
+
public CloudConfig build() {
return new CloudConfig(
zkHost,
@@ -304,7 +335,9 @@ public class CloudConfig {
pkiHandlerPrivateKeyPath,
pkiHandlerPublicKeyPath,
useDistributedClusterStateUpdates,
- useDistributedCollectionConfigSetExecution);
+ useDistributedCollectionConfigSetExecution,
+ minStateByteLenForCompression,
+ stateCompressorClass);
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
index d2e8bf1e6a5..bc85f6147d9 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrXmlConfig.java
@@ -558,6 +558,12 @@ public class SolrXmlConfig {
case "distributedCollectionConfigSetExecution":
builder.setUseDistributedCollectionConfigSetExecution(Boolean.parseBoolean(value));
break;
+ case "minStateByteLenForCompression":
+ builder.setMinStateByteLenForCompression(parseInt(name, value));
+ break;
+ case "stateCompressor":
+ builder.setStateCompressorClass(value);
+ break;
default:
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
diff --git
a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index fb7ea1c7990..51a712221d9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -50,6 +50,7 @@ import org.apache.solr.common.util.CommonTestInjection;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.solr.util.LogLevel;
import org.apache.solr.util.TimeOut;
@@ -95,7 +96,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
@Before
public void setUp() throws Exception {
super.setUp();
- fixture = setupTestFixture(getTestName());
+ fixture = setupTestFixture(getTestName(), -1);
}
@Override
@@ -107,7 +108,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
super.tearDown();
}
- private static TestFixture setupTestFixture(String testPrefix) throws
Exception {
+ private static TestFixture setupTestFixture(String testPrefix, int
minStateByteLenForCompression)
+ throws Exception {
Path zkDir = createTempDir(testPrefix);
ZkTestServer server = new ZkTestServer(zkDir);
server.run();
@@ -118,7 +120,8 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
ZkStateReader reader = new ZkStateReader(zkClient);
reader.createClusterStateWatchersAndUpdate();
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), minStateByteLenForCompression,
new ZLibCompressor());
return new TestFixture(server, zkClient, reader, writer);
}
@@ -447,6 +450,85 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
assertEquals(0, ref.get().getZNodeVersion());
}
+ public void testForciblyRefreshAllClusterStateCompressed() throws Exception {
+ fixture.close();
+ fixture = setupTestFixture(getTestName(), 0);
+ ZkStateWriter writer = fixture.writer;
+ ZkStateReader reader = fixture.reader;
+
+ reader.registerCore("c1"); // watching c1, so it should get non lazy
reference
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ // Initially there should be no c1 collection.
+ assertNull(reader.getClusterState().getCollectionRef("c1"));
+
+ // create new collection
+ DocCollection state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(),
Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ assertTrue(fixture.zkClient.exists(ZkStateReader.COLLECTIONS_ZKNODE +
"/c1/state.json", true));
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ClusterState.CollectionRef ref =
reader.getClusterState().getCollectionRef("c1");
+ assertNotNull(ref);
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(0, ref.get().getZNodeVersion());
+
+ // update the collection
+ state =
+ new DocCollection(
+ "c1",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ ref.get().getZNodeVersion());
+ wc = new ZkWriteCommand("c1", state);
+ writer.enqueueUpdate(reader.getClusterState(),
Collections.singletonList(wc), null);
+ writer.writePendingUpdates();
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertNotNull(ref);
+ assertFalse(ref.isLazilyLoaded());
+ assertEquals(1, ref.get().getZNodeVersion());
+
+ // delete the collection c1, add a collection c2 that is NOT watched
+ ZkWriteCommand wc1 = new ZkWriteCommand("c1", null);
+
+ fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+ state =
+ new DocCollection(
+ "c2",
+ new HashMap<>(),
+ Map.of(ZkStateReader.CONFIGNAME_PROP,
ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
+ DocRouter.DEFAULT,
+ 0);
+ ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
+
+ writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2),
null);
+ writer.writePendingUpdates();
+
+ reader.forciblyRefreshAllClusterStateSlow();
+ ref = reader.getClusterState().getCollectionRef("c1");
+ assertNull(ref);
+
+ ref = reader.getClusterState().getCollectionRef("c2");
+ assertNotNull(ref);
+ assertTrue(
+ "c2 should have been lazily loaded but is not!",
+ ref.isLazilyLoaded()); // c2 should be lazily loaded as it's not
watched
+ assertEquals(0, ref.get().getZNodeVersion());
+ }
+
public void testGetCurrentCollections() throws Exception {
ZkStateWriter writer = fixture.writer;
ZkStateReader reader = fixture.reader;
diff --git
a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 5c3e444c426..f5b2d4a25e1 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -33,10 +33,13 @@ import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.PerReplicaStatesFetcher;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.common.util.ZLibCompressor;
import org.apache.solr.handler.admin.ConfigSetsHandler;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
@@ -50,6 +53,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
private static final ZkStateWriter.ZkWriteCallback FAIL_ON_WRITE =
() -> fail("Got unexpected flush");
+ private static final Compressor STATE_COMPRESSION_PROVIDER = new
ZLibCompressor();
+
@BeforeClass
public static void setup() {
System.setProperty("solr.OverseerStateUpdateDelay", "1000");
@@ -94,7 +99,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
ZkWriteCommand c3 =
new ZkWriteCommand(
"c3", new DocCollection("c3", new HashMap<>(), props,
DocRouter.DEFAULT, 0));
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), -1,
STATE_COMPRESSION_PROVIDER);
// First write is flushed immediately
ClusterState clusterState =
@@ -171,7 +177,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
zkClient, DocCollection.getCollectionPath("c1"))));
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), -1,
STATE_COMPRESSION_PROVIDER);
// First write is flushed immediately
ClusterState clusterState =
@@ -252,7 +259,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
0,
new PerReplicaStatesFetcher.LazyPrsSupplier(
zkClient, DocCollection.getCollectionPath("prs1"))));
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), -1,
STATE_COMPRESSION_PROVIDER);
// First write is flushed immediately
ClusterState clusterState =
@@ -299,7 +307,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate();
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), -1,
STATE_COMPRESSION_PROVIDER);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
@@ -347,7 +356,8 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
try (ZkStateReader reader = new ZkStateReader(zkClient)) {
reader.createClusterStateWatchersAndUpdate();
- ZkStateWriter writer = new ZkStateWriter(reader, new Stats());
+ ZkStateWriter writer =
+ new ZkStateWriter(reader, new Stats(), -1,
STATE_COMPRESSION_PROVIDER);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
@@ -427,4 +437,91 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
server.shutdown();
}
}
+
+ public void testSingleExternalCollectionCompressedState() throws Exception {
+ Path zkDir = createTempDir("testSingleExternalCollection");
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ Compressor compressor = new ZLibCompressor();
+
+ try {
+ server.run();
+
+ zkClient = new SolrZkClient(server.getZkAddress(),
OverseerTest.DEFAULT_CONNECTION_TIMEOUT);
+ ZkController.createClusterZkNodes(zkClient);
+
+ try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+ reader.createClusterStateWatchersAndUpdate();
+
+ ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), 500000,
compressor);
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ // create new collection with stateFormat = 2
+ ZkWriteCommand c1 =
+ new ZkWriteCommand(
+ "c1",
+ new DocCollection(
+ "c1",
+ new HashMap<String, Slice>(),
+ new HashMap<String, Object>(),
+ DocRouter.DEFAULT,
+ 0));
+
+ writer.enqueueUpdate(reader.getClusterState(),
Collections.singletonList(c1), null);
+ writer.writePendingUpdates();
+
+ byte[] data =
+ zkClient
+ .getZooKeeper()
+ .getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json",
null, null);
+ Map<?, ?> map = (Map<?, ?>) Utils.fromJSON(data);
+ assertNotNull(map.get("c1"));
+ }
+
+ try (ZkStateReader reader = new ZkStateReader(zkClient)) {
+ reader.createClusterStateWatchersAndUpdate();
+
+ ZkStateWriter writer = new ZkStateWriter(reader, new Stats(), 500000,
compressor);
+
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
+
+ // create new collection with stateFormat = 2 that is large enough to
exceed the minimum
+ // size for compression
+ Map<String, Slice> slices = new HashMap<>();
+ for (int i = 0; i < 4096; i++) {
+ Map<String, Replica> replicas = new HashMap<>();
+ Map<String, Object> replicaProps = new HashMap<>();
+ replicaProps.put(ZkStateReader.NODE_NAME_PROP, "node1:8983_8983");
+ replicaProps.put(ZkStateReader.CORE_NAME_PROP, "coreNode" + i);
+ replicaProps.put(ZkStateReader.REPLICA_TYPE, "NRT");
+ replicaProps.put(ZkStateReader.BASE_URL_PROP,
"http://localhost:8983");
+ replicas.put(
+ "coreNode" + i, new Replica("coreNode" + i, replicaProps, "c2",
"shard" + i));
+ slices.put("shard" + i, new Slice("shard" + i, replicas, new
HashMap<>(), "c2"));
+ }
+ ZkWriteCommand c1 =
+ new ZkWriteCommand(
+ "c2", new DocCollection("c2", slices, new HashMap<>(),
DocRouter.DEFAULT, 0));
+
+ writer.enqueueUpdate(reader.getClusterState(),
Collections.singletonList(c1), null);
+ writer.writePendingUpdates();
+
+ byte[] data =
+ zkClient
+ .getZooKeeper()
+ .getData(ZkStateReader.COLLECTIONS_ZKNODE + "/c2/state.json",
null, null);
+ assertTrue(compressor.isCompressedBytes(data));
+ Map<?, ?> map = (Map<?, ?>)
Utils.fromJSON(compressor.decompressBytes(data));
+ assertNotNull(map.get("c2"));
+ }
+
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
}
diff --git a/solr/server/solr/solr.xml b/solr/server/solr/solr.xml
index 459dbff87f2..17783906c6c 100644
--- a/solr/server/solr/solr.xml
+++ b/solr/server/solr/solr.xml
@@ -50,6 +50,8 @@
<str
name="zkCredentialsInjector">${zkCredentialsInjector:org.apache.solr.common.cloud.DefaultZkCredentialsInjector}</str>
<bool
name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
<bool
name="distributedCollectionConfigSetExecution">${distributedCollectionConfigSetExecution:false}</bool>
+ <int
name="minStateByteLenForCompression">${minStateByteLenForCompression:-1}</int>
+ <str
name="stateCompressor">${stateCompressor:org.apache.solr.common.util.ZLibCompressor}</str>
</solrcloud>
diff --git
a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
index 1a6fd121f11..39d98f5d541 100644
---
a/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
+++
b/solr/solr-ref-guide/modules/configuration-guide/pages/configuring-solr-xml.adoc
@@ -53,6 +53,8 @@ The default `solr.xml` file is found in
`$SOLR_TIP/server/solr/solr.xml` and loo
<str
name="zkCredentialsInjector">${zkCredentialsInjector:org.apache.solr.common.cloud.DefaultZkCredentialsInjector}</str>
<bool
name="distributedClusterStateUpdates">${distributedClusterStateUpdates:false}</bool>
<bool
name="distributedCollectionConfigSetExecution">${distributedCollectionConfigSetExecution:false}</bool>
+ <int
name="minStateByteLenForCompression">${minStateByteLenForCompression:-1}</int>
+ <str
name="compressor">${compressor:org.apache.solr.common.util.ZLibCompressor}</str>
</solrcloud>
@@ -423,6 +425,24 @@ Optional parameters that can be specified if you are using
xref:deployment-guide
+
If `true`, the internal behavior of SolrCloud is changed to not use the
Overseer for collections' `state.json` updates but do this directly against
ZooKeeper.
+`minStateByteLenForCompression`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default: -1
+|===
++
+Optional parameter to enable compression of the state.json over the wire and
stored in Zookeeper. The value provided is the minimum length of bytes to
compress state.json, i.e. any state.json above that size in bytes will be
compressed. The default is -1, meaning state.json is always uncompressed.
+
+`stateCompressor`::
++
+[%autowidth,frame=none]
+|===
+|Optional |Default:org.apache.solr.common.util.ZLibCompressor
+|===
++
+Optional parameter to provide a compression implementation for state.json over
the wire and stored in Zookeeper. The value provided is the class to use for
state compression. This is only used if minStateByteLenForCompression is set to
a value above -1.
+
=== The <logging> Element
`class`::
diff --git
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 9070406d84e..361bcc0b7a3 100644
---
a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++
b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
+import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.LongAdder;
@@ -37,10 +38,12 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.cloud.ConnectionManager.IsClosed;
+import org.apache.solr.common.util.Compressor;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.common.util.ZLibCompressor;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
@@ -78,6 +81,8 @@ public class SolrZkClient implements Closeable {
private final ZkMetrics metrics = new ZkMetrics();
+ private Compressor compressor;
+
public MapWriter getMetrics() {
return metrics::writeMap;
}
@@ -154,6 +159,28 @@ public class SolrZkClient implements Closeable {
BeforeReconnect beforeReconnect,
ZkACLProvider zkACLProvider,
IsClosed higherLevelIsClosed) {
+ this(
+ zkServerAddress,
+ zkClientTimeout,
+ clientConnectTimeout,
+ strat,
+ onReconnect,
+ beforeReconnect,
+ zkACLProvider,
+ higherLevelIsClosed,
+ null);
+ }
+
+ public SolrZkClient(
+ String zkServerAddress,
+ int zkClientTimeout,
+ int clientConnectTimeout,
+ ZkClientConnectionStrategy strat,
+ final OnReconnect onReconnect,
+ BeforeReconnect beforeReconnect,
+ ZkACLProvider zkACLProvider,
+ IsClosed higherLevelIsClosed,
+ Compressor compressor) {
this.zkServerAddress = zkServerAddress;
this.higherLevelIsClosed = higherLevelIsClosed;
if (strat == null) {
@@ -230,6 +257,12 @@ public class SolrZkClient implements Closeable {
} else {
this.zkACLProvider = zkACLProvider;
}
+
+ if (compressor == null) {
+ this.compressor = new ZLibCompressor();
+ } else {
+ this.compressor = compressor;
+ }
}
public ConnectionManager getConnectionManager() {
@@ -433,6 +466,18 @@ public class SolrZkClient implements Closeable {
} else {
result = keeper.getData(path, wrapWatcher(watcher), stat);
}
+ if (compressor.isCompressedBytes(result)) {
+ log.debug("Zookeeper data at path {} is compressed", path);
+ try {
+ result = compressor.decompressBytes(result);
+ } catch (Exception e) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ String.format(
+ Locale.ROOT, "Unable to decompress data at path: %s from
zookeeper", path),
+ e);
+ }
+ }
metrics.reads.increment();
if (result != null) {
metrics.bytesRead.add(result.length);
diff --git a/solr/solrj/src/java/org/apache/solr/common/util/Compressor.java
b/solr/solrj/src/java/org/apache/solr/common/util/Compressor.java
new file mode 100644
index 00000000000..26903b4cf19
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/Compressor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+// Interface for compression implementations, providing methods to compress
and decompress data
+public interface Compressor {
+
+ /**
+ * Check to determine if the data is compressed in the expected compression
implementation
+ *
+ * @param data - the bytes to check for compression
+ * @return true if the data is compressed in the expected compression
implementation
+ */
+ boolean isCompressedBytes(byte[] data);
+
+ /**
+ * Decompresses compressed bytes, returning the uncompressed data as a byte[]
+ *
+ * @param data the input compressed data to decompress
+ * @return the decompressed bytes
+ * @throws Exception - The data is not compressed or the data is not
compressed in the correct
+ * format
+ */
+ byte[] decompressBytes(byte[] data) throws Exception;
+
+ /**
+ * Compresses bytes into compressed bytes using the compression
implementation
+ *
+ * @param data the input uncompressed data to be compressed
+ * @return compressed bytes
+ */
+ byte[] compressBytes(byte[] data);
+}
diff --git
a/solr/solrj/src/java/org/apache/solr/common/util/ZLibCompressor.java
b/solr/solrj/src/java/org/apache/solr/common/util/ZLibCompressor.java
new file mode 100644
index 00000000000..61fcd3714a3
--- /dev/null
+++ b/solr/solrj/src/java/org/apache/solr/common/util/ZLibCompressor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+public class ZLibCompressor implements Compressor {
+
+ private static final byte[] ZLIB_MAGIC = new byte[] {0x78, 0x1};
+ private static final int COMPRESSED_SIZE_MAGIC_NUMBER = 2018370979;
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>Uses the hex magic number for zlib compression '78 01' to check if the
bytes are compressed
+ */
+ @Override
+ public boolean isCompressedBytes(byte[] data) {
+ if (data == null || data.length < 2) return false;
+ return ZLIB_MAGIC[0] == data[0] && ZLIB_MAGIC[1] == data[1];
+ }
+
+ @Override
+ public byte[] decompressBytes(byte[] data) throws Exception {
+ if (data == null) return null;
+ Inflater inflater = new Inflater();
+ try {
+ inflater.setInput(data, 0, data.length);
+ // Attempt to get the decompressed size from trailing bytes, this will
be present if
+ // compressed by Solr
+ ByteBuffer bb = ByteBuffer.wrap(data, data.length - 8, 8);
+ int decompressedSize = bb.getInt();
+ int xoredSize = bb.getInt();
+ if ((decompressedSize ^ COMPRESSED_SIZE_MAGIC_NUMBER) != xoredSize) {
+ // Take best guess of decompressed size since it wasn't included in
trailing bytes, assume a
+ // 5:1 ratio
+ decompressedSize = 5 * data.length;
+ }
+ byte[] buf = new byte[decompressedSize];
+ int actualDecompressedSize = 0;
+ while (!inflater.finished()) {
+ if (actualDecompressedSize >= buf.length) {
+ buf = Arrays.copyOf(buf, (int) (buf.length * 1.5));
+ }
+ actualDecompressedSize += inflater.inflate(buf,
actualDecompressedSize, decompressedSize);
+ }
+ if (buf.length != actualDecompressedSize) {
+ buf = Arrays.copyOf(buf, actualDecompressedSize);
+ }
+ return buf;
+ } finally {
+ inflater.end();
+ }
+ }
+
+ @Override
+ public byte[] compressBytes(byte[] data) {
+ Deflater compressor = new Deflater(Deflater.BEST_SPEED);
+ try {
+ compressor.setInput(data);
+ compressor.finish();
+ byte[] buf = new byte[data.length + 8];
+ int compressedSize = 0;
+ while (!compressor.finished()) {
+ if (compressedSize >= buf.length) {
+ buf = Arrays.copyOf(buf, (int) (buf.length * 1.5));
+ }
+ compressedSize += compressor.deflate(buf, compressedSize, buf.length -
compressedSize);
+ }
+
+ buf = Arrays.copyOf(buf, compressedSize + 8);
+
+ // Include the decompressed size and xored decompressed size in trailing
bytes, this makes
+ // decompression
+ // efficient while also being compatible with alternative zlib
implementations
+ ByteBuffer.wrap(buf, compressedSize, 8)
+ .putInt(data.length)
+ .putInt(data.length ^ COMPRESSED_SIZE_MAGIC_NUMBER);
+ return buf;
+ } finally {
+ compressor.end();
+ }
+ }
+}
diff --git
a/solr/solrj/src/test/org/apache/solr/common/cloud/SolrZkClientCompressedDataTest.java
b/solr/solrj/src/test/org/apache/solr/common/cloud/SolrZkClientCompressedDataTest.java
new file mode 100644
index 00000000000..b8ab6f1e3d8
--- /dev/null
+++
b/solr/solrj/src/test/org/apache/solr/common/cloud/SolrZkClientCompressedDataTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.cloud;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Map;
+import org.apache.lucene.util.IOUtils;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.common.util.ZLibCompressor;
+import org.apache.zookeeper.CreateMode;
+import org.junit.Test;
+
+public class SolrZkClientCompressedDataTest extends SolrTestCase {
+
+ @Test
+ public void getData() throws Exception {
+ Path zkDir = createTempDir("testGetData");
+
+ ZkTestServer server = new ZkTestServer(zkDir);
+
+ SolrZkClient zkClient = null;
+
+ ZLibCompressor zLibStateCompression = new ZLibCompressor();
+
+ try {
+ server.run();
+
+ zkClient = new SolrZkClient(server.getZkAddress(), 60000);
+ ZkController.createClusterZkNodes(zkClient);
+ zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
+
+ String state =
+ "{\"c1\":{\n"
+ + "\"pullReplicas\":\"0\",\n"
+ + "\"replicationFactor\":\"1\",\n"
+ + "\"router\":{\"name\":\"compositeId\"},\n"
+ + "\"maxShardsPerNode\":\"1\",\n"
+ + "\"autoAddReplicas\":\"false\",\n"
+ + "\"nrtReplicas\":\"1\",\n"
+ + "\"tlogReplicas\":\"0\",\n"
+ + "\"shards\":{\"shard1\":{\n"
+ + "\"range\":\"80000000-7fffffff\",\n"
+ + "\"state\":\"active\",\n"
+ + "\"replicas\":{\"core_node2\":{\n"
+ + "\"core\":\"test_shard1_replica_n1\",\n"
+ + "\"node_name\":\"127.0.0.1:8983_solr\",\n"
+ + "\"base_url\":\"http://127.0.0.1:8983/solr\",\n"
+ + "\"state\":\"active\",\n"
+ + "\"type\":\"NRT\",\n"
+ + "\"force_set_state\":\"false\",\n"
+ + "\"leader\":\"true\"}}}}}}";
+ byte[] arr = state.getBytes(StandardCharsets.UTF_8);
+ byte[] compressedData = zLibStateCompression.compressBytes(arr);
+ ZkACLProvider aclProvider = new DefaultZkACLProvider();
+ String path = ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json";
+ zkClient
+ .getZooKeeper()
+ .create(path, compressedData, aclProvider.getACLsToAdd(path),
CreateMode.PERSISTENT);
+
+ byte[] data =
+ zkClient.getData(ZkStateReader.COLLECTIONS_ZKNODE +
"/c1/state.json", null, null, true);
+ Map<?, ?> map = (Map<?, ?>) Utils.fromJSON(data);
+ assertEquals(arr.length, data.length);
+ assertNotNull(map.get("c1"));
+ } finally {
+ IOUtils.close(zkClient);
+ server.shutdown();
+ }
+ }
+}
diff --git
a/solr/solrj/src/test/org/apache/solr/common/util/ZLibCompressorTest.java
b/solr/solrj/src/test/org/apache/solr/common/util/ZLibCompressorTest.java
new file mode 100644
index 00000000000..be2d2975c0d
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ZLibCompressorTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.common.util;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.solr.SolrTestCase;
+import org.junit.Test;
+
+public class ZLibCompressorTest extends SolrTestCase {
+
+ private ZLibCompressor stateCompression = new ZLibCompressor();
+
+ @Test
+ public void isCompressedBytes() {
+ assertFalse(stateCompression.isCompressedBytes(null));
+ assertFalse(stateCompression.isCompressedBytes(new byte[0]));
+ assertFalse(stateCompression.isCompressedBytes(new byte[1]));
+ assertFalse(stateCompression.isCompressedBytes(new byte[2]));
+ assertFalse(stateCompression.isCompressedBytes(new byte[3]));
+
+ Random rd = random();
+ byte[] arr = new byte[500];
+ rd.nextBytes(arr);
+
+ byte[] compressedBytes = stateCompression.compressBytes(arr);
+ assertFalse(stateCompression.isCompressedBytes(arr));
+ assertTrue(stateCompression.isCompressedBytes(compressedBytes));
+ }
+
+ @Test
+ public void decompressCompressedBytes() throws Exception {
+ // "Some test data\n" as compressed bytes
+ byte[] testBytes =
+ new byte[] {
+ 120, 1, 11, -50, -49, 77, 85, 40, 73, 45, 46, 81, 72, 73, 44, 73,
-28, 2, 0, 43, -36, 5,
+ 57
+ };
+ byte[] decompressedBytes = stateCompression.decompressBytes(testBytes);
+ assertEquals("Some test data\n", new String(decompressedBytes,
StandardCharsets.UTF_8));
+ }
+
+ @Test
+ public void compressBytes() {
+ // "Some test data\n" as compressed bytes
+ byte[] testBytes =
+ new byte[] {
+ 120, 1, 11, -50, -49, 77, 85, 40, 73, 45, 46, 81, 72, 73, 44, 73,
-28, 2, 0, 43, -36, 5,
+ 57
+ };
+ byte[] compressedBytes =
+ stateCompression.compressBytes("Some test
data\n".getBytes(StandardCharsets.UTF_8));
+ int decompressedSize = ByteBuffer.wrap(compressedBytes,
compressedBytes.length - 8, 4).getInt();
+ int xoredSize = ByteBuffer.wrap(compressedBytes, compressedBytes.length -
4, 4).getInt();
+ assertEquals(xoredSize, decompressedSize ^ 2018370979);
+ assertEquals("Some test data\n".getBytes(StandardCharsets.UTF_8).length,
decompressedSize);
+ assertArrayEquals(
+ testBytes, ArrayUtil.copyOfSubArray(compressedBytes, 0,
compressedBytes.length - 8));
+ }
+}