This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/curator.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eb6f9ae3 CURATOR-725: Allow for global compression (#512)
8eb6f9ae3 is described below

commit 8eb6f9ae382059b216876112d6c3f76e824e379d
Author: Houston Putman <hous...@apache.org>
AuthorDate: Thu Jan 16 22:50:16 2025 -0600

    CURATOR-725: Allow for global compression (#512)
---
 .../apache/curator/framework/CuratorFramework.java |   7 +
 .../curator/framework/CuratorFrameworkFactory.java |  17 +
 .../apache/curator/framework/api/Compressible.java |   8 +
 .../curator/framework/api/Decompressible.java      |   8 +
 .../curator/framework/imps/CreateBuilderImpl.java  |  19 +-
 .../framework/imps/CuratorFrameworkImpl.java       |   8 +
 .../curator/framework/imps/GetDataBuilderImpl.java |  13 +-
 .../curator/framework/imps/SetDataBuilderImpl.java |  19 +-
 .../framework/imps/TempGetDataBuilderImpl.java     |   8 +-
 .../curator/framework/imps/TestCompression.java    |  69 ++++
 .../imps/TestCompressionInTransactionNew.java      |  64 +++
 .../imps/TestCompressionInTransactionOld.java      |  76 ++++
 .../curator/x/async/api/AsyncGetDataBuilder.java   |  18 +
 .../curator/x/async/api/AsyncSetDataBuilder.java   |  18 +
 .../x/async/api/AsyncTransactionCreateBuilder.java |   8 +
 .../async/api/AsyncTransactionSetDataBuilder.java  |  17 +
 .../apache/curator/x/async/api/CreateOption.java   |   5 +
 .../x/async/details/AsyncCreateBuilderImpl.java    |   4 +-
 .../x/async/details/AsyncGetDataBuilderImpl.java   |  16 +-
 .../x/async/details/AsyncSetDataBuilderImpl.java   |  16 +-
 .../x/async/details/AsyncTransactionOpImpl.java    |  31 +-
 .../modeled/details/ModeledFrameworkImpl.java      |  40 +-
 .../curator/x/async/TestBasicOperations.java       | 184 +++++++++
 .../x/async/modeled/TestModeledFramework.java      | 438 +++++++++++++++++++++
 .../x/async/modeled/TestModeledFrameworkBase.java  |  21 +-
 25 files changed, 1100 insertions(+), 32 deletions(-)

diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 50f6e56ba..56b883388 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable {
      */
     SchemaSet getSchemaSet();
 
+    /**
+     * Return whether compression is enabled by default for all create, 
setData and getData operations.
+     *
+     * @return if compression is enabled
+     */
+    boolean compressionEnabled();
+
     /**
      * Calls {@link #notifyAll()} on the given object after first 
synchronizing on it. This is
      * done from the {@link #runSafe(Runnable)} thread.
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index d24b56d64..87f0e9f17 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -163,6 +163,7 @@ public class CuratorFrameworkFactory {
         private List<AuthInfo> authInfos = null;
         private byte[] defaultData = LOCAL_ADDRESS;
         private CompressionProvider compressionProvider = 
DEFAULT_COMPRESSION_PROVIDER;
+        private boolean compressionEnabled = false;
         private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
         private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
         private boolean canBeReadOnly = false;
@@ -367,6 +368,18 @@ public class CuratorFrameworkFactory {
             return this;
         }
 
+        /**
+         * By default, each write or read call must explicitly use compression.
+         * Call this method to enable compression by default on all read and 
write calls.
+         * <p>
+         * In order to implement filtered compression, use this option and a 
custom {@link CompressionProvider} that only compresses and decompresses the 
zNodes that match the desired filter.
+         * @return this
+         */
+        public Builder enableCompression() {
+            this.compressionEnabled = true;
+            return this;
+        }
+
         /**
          * @param zookeeperFactory the zookeeper factory to use
          * @return this
@@ -542,6 +555,10 @@ public class CuratorFrameworkFactory {
             return compressionProvider;
         }
 
+        public boolean compressionEnabled() {
+            return compressionEnabled;
+        }
+
         public ThreadFactory getThreadFactory() {
             return threadFactory;
         }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java
index 1aaca83c7..4d0feef32 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java
@@ -26,4 +26,12 @@ public interface Compressible<T> {
      * @return this
      */
     public T compressed();
+
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    public T uncompressed();
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java
index b6e6bbba9..a60abb55c 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java
@@ -26,4 +26,12 @@ public interface Decompressible<T> {
      * @return this
      */
     public T decompressed();
+
+    /**
+     * Cause the data to not be de-compressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    public T undecompressed();
 }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 5487f8560..b76ce98f4 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -100,7 +100,7 @@ public class CreateBuilderImpl
         acling = new ACLing(client.getAclProvider());
         createParentsIfNeeded = false;
         createParentsAsContainers = false;
-        compress = false;
+        compress = client.compressionEnabled();
         setDataIfExists = false;
         storingStat = null;
         ttl = -1;
@@ -193,6 +193,12 @@ public class CreateBuilderImpl
                 return this;
             }
 
+            @Override
+            public ACLCreateModePathAndBytesable<T> uncompressed() {
+                CreateBuilderImpl.this.uncompressed();
+                return this;
+            }
+
             @Override
             public T forPath(String path) throws Exception {
                 return forPath(path, client.getDefaultData());
@@ -216,7 +222,16 @@ public class CreateBuilderImpl
 
     @Override
     public CreateBackgroundModeStatACLable compressed() {
-        compress = true;
+        return withCompression(true);
+    }
+
+    @Override
+    public CreateBackgroundModeStatACLable uncompressed() {
+        return withCompression(false);
+    }
+
+    private CreateBackgroundModeStatACLable withCompression(boolean compress) {
+        this.compress = compress;
         return new CreateBackgroundModeStatACLable() {
             @Override
             public CreateBackgroundModeACLable storingStatIn(Stat stat) {
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index fa2360e6c..89f846f4a 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -109,6 +109,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework {
     private final FailedDeleteManager failedDeleteManager;
     private final FailedRemoveWatchManager failedRemoveWatcherManager;
     private final CompressionProvider compressionProvider;
+    private final boolean compressionEnabled;
     private final ACLProvider aclProvider;
     private final NamespaceFacadeCache namespaceFacadeCache;
     private final boolean useContainerParentsIfAvailable;
@@ -185,6 +186,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework {
                 builder.getSimulatedSessionExpirationPercent(),
                 builder.getConnectionStateListenerManagerFactory());
         compressionProvider = builder.getCompressionProvider();
+        compressionEnabled = builder.compressionEnabled();
         aclProvider = builder.getAclProvider();
         state = new 
AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
         useContainerParentsIfAvailable = 
builder.useContainerParentsIfAvailable();
@@ -284,6 +286,7 @@ public class CuratorFrameworkImpl implements 
CuratorFramework {
         failedDeleteManager = parent.failedDeleteManager;
         failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
         compressionProvider = parent.compressionProvider;
+        compressionEnabled = parent.compressionEnabled;
         aclProvider = parent.aclProvider;
         namespaceFacadeCache = parent.namespaceFacadeCache;
         namespace = parent.namespace;
@@ -628,6 +631,11 @@ public class CuratorFrameworkImpl implements 
CuratorFramework {
         return schemaSet;
     }
 
+    @Override
+    public boolean compressionEnabled() {
+        return compressionEnabled;
+    }
+
     ACLProvider getAclProvider() {
         return aclProvider;
     }
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
index 7219cb518..df6d15cbd 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java
@@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, 
BackgroundOperation<S
         responseStat = null;
         watching = new Watching(client);
         backgrounding = new Backgrounding();
-        decompress = false;
+        decompress = client.compressionEnabled();
     }
 
     public GetDataBuilderImpl(
@@ -64,7 +64,16 @@ public class GetDataBuilderImpl implements GetDataBuilder, 
BackgroundOperation<S
 
     @Override
     public GetDataWatchBackgroundStatable decompressed() {
-        decompress = true;
+        return withDecompression(true);
+    }
+
+    @Override
+    public GetDataWatchBackgroundStatable undecompressed() {
+        return withDecompression(false);
+    }
+
+    private GetDataWatchBackgroundStatable withDecompression(boolean 
decompress) {
+        this.decompress = decompress;
         return new GetDataWatchBackgroundStatable() {
             @Override
             public ErrorListenerPathable<byte[]> inBackground() {
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
index 73ba8a540..98b21e33a 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java
@@ -54,7 +54,7 @@ public class SetDataBuilderImpl
         this.client = client;
         backgrounding = new Backgrounding();
         version = -1;
-        compress = false;
+        compress = client.compressionEnabled();
     }
 
     public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding 
backgrounding, int version, boolean compress) {
@@ -94,12 +94,27 @@ public class SetDataBuilderImpl
                 compress = true;
                 return this;
             }
+
+            @Override
+            public VersionPathAndBytesable<T> uncompressed() {
+                compress = false;
+                return this;
+            }
         };
     }
 
     @Override
     public SetDataBackgroundVersionable compressed() {
-        compress = true;
+        return withCompression(true);
+    }
+
+    @Override
+    public SetDataBackgroundVersionable uncompressed() {
+        return withCompression(false);
+    }
+
+    public SetDataBackgroundVersionable withCompression(boolean compress) {
+        this.compress = compress;
         return new SetDataBackgroundVersionable() {
             @Override
             public ErrorListenerPathAndBytesable<Stat> inBackground() {
diff --git 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java
 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java
index 63c9706c9..fabf255dd 100644
--- 
a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java
+++ 
b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java
@@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
     TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
         this.client = client;
         responseStat = null;
-        decompress = false;
+        decompress = client.compressionEnabled();
     }
 
     @Override
@@ -44,6 +44,12 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
         return this;
     }
 
+    @Override
+    public StatPathable<byte[]> undecompressed() {
+        decompress = false;
+        return this;
+    }
+
     @Override
     public Pathable<byte[]> storingStatIn(Stat stat) {
         responseStat = stat;
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java
index 691b74374..9288939ee 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java
@@ -22,7 +22,9 @@ package org.apache.curator.framework.imps;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.ZipException;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.CompressionProvider;
@@ -97,6 +99,73 @@ public class TestCompression extends BaseClassForTests {
         }
     }
 
+    @Test
+    public void testSetDataGlobalCompression() throws Exception {
+        final byte[] data = "here's a string".getBytes();
+        final byte[] gzipedData = GzipCompressionProvider.doCompress(data);
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryOneTime(1))
+                .enableCompression()
+                .build();
+        try {
+            client.start();
+
+            // Create with explicit compression
+            
client.create().compressed().creatingParentsIfNeeded().forPath("/a/b/c", data);
+            assertArrayEquals(data, 
client.getData().decompressed().forPath("/a/b/c"));
+            assertArrayEquals(data, client.getData().forPath("/a/b/c"));
+            assertArrayEquals(gzipedData, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertEquals(
+                    gzipedData.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+
+            // Create explicitly without compression
+            client.delete().forPath("/a/b/c");
+            
client.create().uncompressed().creatingParentsIfNeeded().forPath("/a/b/c", 
data);
+            assertArrayEquals(data, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath("/a/b/c"));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath("/a/b/c"));
+            assertEquals(data.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+
+            // Create with implicit (global) compression
+            client.delete().forPath("/a/b/c");
+            client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
+            assertArrayEquals(data, 
client.getData().decompressed().forPath("/a/b/c"));
+            assertArrayEquals(data, client.getData().forPath("/a/b/c"));
+            assertArrayEquals(gzipedData, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertEquals(
+                    gzipedData.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+
+            // SetData with explicit compression
+            client.setData().compressed().forPath("/a/b/c", data);
+            assertArrayEquals(data, client.getData().forPath("/a/b/c"));
+            assertArrayEquals(data, 
client.getData().decompressed().forPath("/a/b/c"));
+            assertArrayEquals(gzipedData, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertEquals(
+                    gzipedData.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+
+            // SetData explicitly without compression
+            client.setData().uncompressed().forPath("/a/b/c", data);
+            assertArrayEquals(data, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath("/a/b/c"));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath("/a/b/c"));
+            assertEquals(data.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+
+            // SetData with implicit (global) compression
+            client.setData().forPath("/a/b/c", data);
+            assertArrayEquals(data, client.getData().forPath("/a/b/c"));
+            assertArrayEquals(data, 
client.getData().decompressed().forPath("/a/b/c"));
+            assertArrayEquals(gzipedData, 
client.getData().undecompressed().forPath("/a/b/c"));
+            assertEquals(
+                    gzipedData.length, 
client.checkExists().forPath("/a/b/c").getDataLength());
+        } finally {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
     @Test
     public void testSimple() throws Exception {
         final byte[] data = "here's a string".getBytes();
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
index 7d389e351..f7389ce07 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java
@@ -20,8 +20,11 @@
 package org.apache.curator.framework.imps;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.zip.ZipException;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -145,4 +148,65 @@ public class TestCompressionInTransactionNew extends 
BaseClassForTests {
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testGlobalCompression() throws Exception {
+        final String path1 = "/a";
+        final String path2 = "/b";
+
+        final byte[] data1 = "here's a string".getBytes();
+        final byte[] data2 = "here's another string".getBytes();
+        final byte[] gzipedData1 = GzipCompressionProvider.doCompress(data1);
+        final byte[] gzipedData2 = GzipCompressionProvider.doCompress(data2);
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryOneTime(1))
+                .enableCompression()
+                .build();
+        try {
+            client.start();
+
+            // Create the nodes in a transaction
+            // path1 is compressed (globally)
+            // path2 is uncompressed (override)
+            CuratorOp op1 = client.transactionOp().create().forPath(path1, 
data1);
+            CuratorOp op2 = 
client.transactionOp().create().uncompressed().forPath(path2, data2);
+            client.transaction().forOperations(op1, op2);
+
+            // Check they exist
+            assertNotNull(client.checkExists().forPath(path1));
+            assertEquals(gzipedData1.length, 
client.checkExists().forPath(path1).getDataLength());
+            assertNotNull(client.checkExists().forPath(path2));
+            assertEquals(data2.length, 
client.checkExists().forPath(path2).getDataLength());
+            assertArrayEquals(data1, client.getData().forPath(path1));
+            assertArrayEquals(data1, 
client.getData().decompressed().forPath(path1));
+            assertArrayEquals(gzipedData1, 
client.getData().undecompressed().forPath(path1));
+            assertArrayEquals(data2, 
client.getData().undecompressed().forPath(path2));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath(path2));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath(path2));
+
+            // Set data in transaction
+            // path1 is uncompressed (override)
+            // path2 is compressed (globally)
+            op1 = 
client.transactionOp().setData().uncompressed().forPath(path1, data1);
+            op2 = client.transactionOp().setData().forPath(path2, data2);
+            client.transaction().forOperations(op1, op2);
+
+            assertNotNull(client.checkExists().forPath(path1));
+            assertEquals(data1.length, 
client.checkExists().forPath(path1).getDataLength());
+            assertNotNull(client.checkExists().forPath(path2));
+            assertEquals(gzipedData2.length, 
client.checkExists().forPath(path2).getDataLength());
+            assertArrayEquals(data1, 
client.getData().undecompressed().forPath(path1));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath(path1));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath(path1));
+            assertArrayEquals(data2, 
client.getData().decompressed().forPath(path2));
+            assertArrayEquals(data2, client.getData().forPath(path2));
+            assertArrayEquals(gzipedData2, 
client.getData().undecompressed().forPath(path2));
+        } finally {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
index d347e20fe..9e8958f2e 100644
--- 
a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
+++ 
b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java
@@ -20,8 +20,11 @@
 package org.apache.curator.framework.imps;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.zip.ZipException;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
@@ -172,4 +175,77 @@ public class TestCompressionInTransactionOld extends 
BaseClassForTests {
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testGlobalCompression() throws Exception {
+        final String path1 = "/a";
+        final String path2 = "/b";
+
+        final byte[] data1 = "here's a string".getBytes();
+        final byte[] data2 = "here's another string".getBytes();
+        final byte[] gzipedData1 = GzipCompressionProvider.doCompress(data1);
+        final byte[] gzipedData2 = GzipCompressionProvider.doCompress(data2);
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryOneTime(1))
+                .enableCompression()
+                .build();
+        try {
+            client.start();
+
+            // Create the nodes in a transaction
+            // path1 is compressed (globally)
+            // path2 is uncompressed (override)
+            client.inTransaction()
+                    .create()
+                    .forPath(path1, data1)
+                    .and()
+                    .create()
+                    .uncompressed()
+                    .forPath(path2, data2)
+                    .and()
+                    .commit();
+
+            // Check they exist
+            assertNotNull(client.checkExists().forPath(path1));
+            assertEquals(gzipedData1.length, 
client.checkExists().forPath(path1).getDataLength());
+            assertNotNull(client.checkExists().forPath(path2));
+            assertEquals(data2.length, 
client.checkExists().forPath(path2).getDataLength());
+            assertArrayEquals(data1, client.getData().forPath(path1));
+            assertArrayEquals(data1, 
client.getData().decompressed().forPath(path1));
+            assertArrayEquals(gzipedData1, 
client.getData().undecompressed().forPath(path1));
+            assertArrayEquals(data2, 
client.getData().undecompressed().forPath(path2));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath(path2));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath(path2));
+
+            // Set data in transaction
+            // path1 is uncompressed (override)
+            // path2 is compressed (globally)
+            client.inTransaction()
+                    .setData()
+                    .uncompressed()
+                    .forPath(path1, data1)
+                    .and()
+                    .setData()
+                    .forPath(path2, data2)
+                    .and()
+                    .commit();
+
+            assertNotNull(client.checkExists().forPath(path1));
+            assertEquals(data1.length, 
client.checkExists().forPath(path1).getDataLength());
+            assertNotNull(client.checkExists().forPath(path2));
+            assertEquals(gzipedData2.length, 
client.checkExists().forPath(path2).getDataLength());
+            assertArrayEquals(data1, 
client.getData().undecompressed().forPath(path1));
+            assertThrows(
+                    ZipException.class, () -> 
client.getData().decompressed().forPath(path1));
+            assertThrows(ZipException.class, () -> 
client.getData().forPath(path1));
+            assertArrayEquals(data2, 
client.getData().decompressed().forPath(path2));
+            assertArrayEquals(data2, client.getData().forPath(path2));
+            assertArrayEquals(gzipedData2, 
client.getData().undecompressed().forPath(path2));
+        } finally {
+            CloseableUtils.closeQuietly(client);
+        }
+    }
 }
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java
index 1840e39b3..9c4c98d26 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java
@@ -33,6 +33,14 @@ public interface AsyncGetDataBuilder extends 
AsyncPathable<AsyncStage<byte[]>> {
      */
     AsyncPathable<AsyncStage<byte[]>> decompressed();
 
+    /**
+     * Cause the data to not be de-compressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    AsyncPathable<AsyncStage<byte[]>> undecompressed();
+
     /**
      * Have the operation fill the provided stat object
      *
@@ -50,4 +58,14 @@ public interface AsyncGetDataBuilder extends 
AsyncPathable<AsyncStage<byte[]>> {
      * @return this
      */
     AsyncPathable<AsyncStage<byte[]>> decompressedStoringStatIn(Stat stat);
+
+    /**
+     * Have the operation fill the provided stat object without the data being 
de-compressed
+     *
+     * @param stat the stat to have filled in
+     * @see #undecompressed()
+     * @see #storingStatIn(org.apache.zookeeper.data.Stat)
+     * @return this
+     */
+    AsyncPathable<AsyncStage<byte[]>> undecompressedStoringStatIn(Stat stat);
 }
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java
index 972ddcfaa..8208ff551 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java
@@ -33,6 +33,14 @@ public interface AsyncSetDataBuilder extends 
AsyncPathAndBytesable<AsyncStage<St
      */
     AsyncPathAndBytesable<AsyncStage<Stat>> compressed();
 
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    AsyncPathAndBytesable<AsyncStage<Stat>> uncompressed();
+
     /**
      * Cause the data to be compressed using the configured compression 
provider.
      * Only sets if the version matches. By default -1 is used
@@ -43,6 +51,16 @@ public interface AsyncSetDataBuilder extends 
AsyncPathAndBytesable<AsyncStage<St
      */
     AsyncPathAndBytesable<AsyncStage<Stat>> compressedWithVersion(int version);
 
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled. Only sets if the version matches. By default -1 
is used
+     * which matches all versions.
+     *
+     * @param version version
+     * @return this
+     */
+    AsyncPathAndBytesable<AsyncStage<Stat>> uncompressedWithVersion(int 
version);
+
     /**
      * Only sets if the version matches. By default -1 is used
      * which matches all versions.
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java
index 1bb7de523..fc9bd77d8 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java
@@ -51,6 +51,14 @@ public interface AsyncTransactionCreateBuilder extends 
AsyncPathAndBytesable<Cur
      */
     AsyncPathAndBytesable<CuratorOp> compressed();
 
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    AsyncPathAndBytesable<CuratorOp> uncompressed();
+
     /**
      * Specify a TTL when mode is {@link 
org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or
      * {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. 
If
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java
index 37f45a1d2..82a754f79 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java
@@ -40,6 +40,14 @@ public interface AsyncTransactionSetDataBuilder extends 
AsyncPathAndBytesable<Cu
      */
     AsyncPathAndBytesable<CuratorOp> compressed();
 
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled
+     *
+     * @return this
+     */
+    AsyncPathAndBytesable<CuratorOp> uncompressed();
+
     /**
      * Cause the data to be compressed using the configured compression 
provider.
      * Also changes the version number used. By default, -1 is used
@@ -48,4 +56,13 @@ public interface AsyncTransactionSetDataBuilder extends 
AsyncPathAndBytesable<Cu
      * @return this
      */
     AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int version);
+
+    /**
+     * Cause the data to be uncompressed, even if the {@link 
org.apache.curator.framework.CuratorFramework}
+     * has compressionEnabled. Also changes the version number used. By 
default, -1 is used
+     *
+     * @param version version to use
+     * @return this
+     */
+    AsyncPathAndBytesable<CuratorOp> withVersionUncompressed(int version);
 }
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java
index 6214f9c1d..6b6a005fe 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java
@@ -69,6 +69,11 @@ public enum CreateOption {
      */
     compress,
 
+    /**
+     * Cause the data to be uncompressed
+     */
+    uncompress,
+
     /**
      * If the ZNode already exists, Curator will instead call setData()
      */
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
index 5ab579934..6f9a6c045 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java
@@ -167,7 +167,9 @@ class AsyncCreateBuilderImpl implements AsyncCreateBuilder {
                         || 
options.contains(CreateOption.createParentsAsContainers),
                 options.contains(CreateOption.createParentsAsContainers),
                 options.contains(CreateOption.doProtected),
-                options.contains(CreateOption.compress),
+                options.contains(CreateOption.compress)
+                        ? true
+                        : options.contains(CreateOption.uncompress) ? false : 
client.compressionEnabled(),
                 options.contains(CreateOption.setDataIfExists),
                 aclList,
                 stat,
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
index a94d1052b..1498968a1 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java
@@ -33,13 +33,14 @@ class AsyncGetDataBuilderImpl implements 
AsyncGetDataBuilder {
     private final CuratorFrameworkImpl client;
     private final Filters filters;
     private final WatchMode watchMode;
-    private boolean decompressed = false;
+    private boolean decompressed;
     private Stat stat = null;
 
     AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, 
WatchMode watchMode) {
         this.client = client;
         this.filters = filters;
         this.watchMode = watchMode;
+        this.decompressed = client.compressionEnabled();
     }
 
     @Override
@@ -48,6 +49,12 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder 
{
         return this;
     }
 
+    @Override
+    public AsyncPathable<AsyncStage<byte[]>> undecompressed() {
+        decompressed = false;
+        return this;
+    }
+
     @Override
     public AsyncPathable<AsyncStage<byte[]>> storingStatIn(Stat stat) {
         this.stat = stat;
@@ -61,6 +68,13 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder 
{
         return this;
     }
 
+    @Override
+    public AsyncPathable<AsyncStage<byte[]>> undecompressedStoringStatIn(Stat 
stat) {
+        decompressed = false;
+        this.stat = stat;
+        return this;
+    }
+
     @Override
     public AsyncStage<byte[]> forPath(String path) {
         BuilderCommon<byte[]> common = new BuilderCommon<>(filters, watchMode, 
dataProc);
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
index af1452457..0de17249d 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java
@@ -31,12 +31,13 @@ import org.apache.zookeeper.data.Stat;
 class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder {
     private final CuratorFrameworkImpl client;
     private final Filters filters;
-    private boolean compressed = false;
+    private boolean compressed;
     private int version = -1;
 
     AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters) {
         this.client = client;
         this.filters = filters;
+        this.compressed = client.compressionEnabled();
     }
 
     @Override
@@ -55,6 +56,12 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder 
{
         return this;
     }
 
+    @Override
+    public AsyncPathAndBytesable<AsyncStage<Stat>> uncompressed() {
+        compressed = false;
+        return this;
+    }
+
     @Override
     public AsyncPathAndBytesable<AsyncStage<Stat>> compressedWithVersion(int 
version) {
         compressed = true;
@@ -62,6 +69,13 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder 
{
         return this;
     }
 
+    @Override
+    public AsyncPathAndBytesable<AsyncStage<Stat>> uncompressedWithVersion(int 
version) {
+        compressed = false;
+        this.version = version;
+        return this;
+    }
+
     @Override
     public AsyncPathAndBytesable<AsyncStage<Stat>> withVersion(int version) {
         this.version = version;
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
index e9ccbf16d..1ef3d23d4 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java
@@ -50,7 +50,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp {
         return new AsyncTransactionCreateBuilder() {
             private List<ACL> aclList = null;
             private CreateMode createMode = CreateMode.PERSISTENT;
-            private boolean compressed = false;
+            private boolean compressed = client.compressionEnabled();
             private long ttl = -1;
 
             @Override
@@ -71,6 +71,12 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp {
                 return this;
             }
 
+            @Override
+            public AsyncPathAndBytesable<CuratorOp> uncompressed() {
+                compressed = false;
+                return this;
+            }
+
             @Override
             public AsyncPathAndBytesable<CuratorOp> withTtl(long ttl) {
                 this.ttl = ttl;
@@ -107,8 +113,9 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp {
                 TransactionCreateBuilder2<CuratorOp> builder1 = (ttl > 0)
                         ? client.transactionOp().create().withTtl(ttl)
                         : client.transactionOp().create();
-                ACLPathAndBytesable<CuratorOp> builder2 =
-                        compressed ? 
builder1.compressed().withMode(createMode) : builder1.withMode(createMode);
+                ACLPathAndBytesable<CuratorOp> builder2 = compressed
+                        ? builder1.compressed().withMode(createMode)
+                        : builder1.uncompressed().withMode(createMode);
                 PathAndBytesable<CuratorOp> builder3 = 
builder2.withACL(aclList);
                 try {
                     return useData ? builder3.forPath(path, data) : 
builder3.forPath(path);
@@ -145,7 +152,7 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp {
     public AsyncTransactionSetDataBuilder setData() {
         return new AsyncTransactionSetDataBuilder() {
             private int version = -1;
-            private boolean compressed = false;
+            private boolean compressed = client.compressionEnabled();
 
             @Override
             public AsyncPathAndBytesable<CuratorOp> withVersion(int version) {
@@ -159,6 +166,12 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp 
{
                 return this;
             }
 
+            @Override
+            public AsyncPathAndBytesable<CuratorOp> uncompressed() {
+                compressed = false;
+                return this;
+            }
+
             @Override
             public AsyncPathAndBytesable<CuratorOp> withVersionCompressed(int 
version) {
                 this.version = version;
@@ -166,6 +179,13 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp 
{
                 return this;
             }
 
+            @Override
+            public AsyncPathAndBytesable<CuratorOp> 
withVersionUncompressed(int version) {
+                this.version = version;
+                compressed = false;
+                return this;
+            }
+
             @Override
             public CuratorOp forPath(String path, byte[] data) {
                 return internalForPath(path, data, true);
@@ -179,7 +199,8 @@ class AsyncTransactionOpImpl implements AsyncTransactionOp {
             private CuratorOp internalForPath(String path, byte[] data, 
boolean useData) {
                 TransactionSetDataBuilder<CuratorOp> builder1 =
                         client.transactionOp().setData();
-                VersionPathAndBytesable<CuratorOp> builder2 = compressed ? 
builder1.compressed() : builder1;
+                VersionPathAndBytesable<CuratorOp> builder2 =
+                        compressed ? builder1.compressed() : 
builder1.uncompressed();
                 PathAndBytesable<CuratorOp> builder3 = 
builder2.withVersion(version);
                 try {
                     return useData ? builder3.forPath(path, data) : 
builder3.forPath(path);
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index 01e95decc..c02c93127 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -213,8 +213,12 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T> {
         try {
             byte[] bytes = modelSpec.serializer().serialize(item);
             AsyncSetDataBuilder dataBuilder = dslClient.setData();
-            AsyncPathAndBytesable<AsyncStage<Stat>> next =
-                    isCompressed() ? 
dataBuilder.compressedWithVersion(version) : dataBuilder.withVersion(version);
+            AsyncPathAndBytesable<AsyncStage<Stat>> next;
+            if (isCompressed()) {
+                next = dataBuilder.compressedWithVersion(version);
+            } else {
+                next = dataBuilder.uncompressedWithVersion(version);
+            }
             return next.forPath(resolveForSet(item), bytes);
         } catch (Exception e) {
             return ModelStage.exceptionally(e);
@@ -352,11 +356,7 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T> {
     public CuratorOp createOp(T model) {
         return client.transactionOp()
                 .create()
-                .withOptions(
-                        modelSpec.createMode(),
-                        fixAclList(modelSpec.aclList()),
-                        
modelSpec.createOptions().contains(CreateOption.compress),
-                        modelSpec.ttl())
+                .withOptions(modelSpec.createMode(), 
fixAclList(modelSpec.aclList()), isCompressed(), modelSpec.ttl())
                 .forPath(resolveForSet(model), 
modelSpec.serializer().serialize(model));
     }
 
@@ -368,12 +368,13 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T> {
     @Override
     public CuratorOp updateOp(T model, int version) {
         AsyncTransactionSetDataBuilder builder = 
client.transactionOp().setData();
+        AsyncPathAndBytesable<CuratorOp> builder2;
         if (isCompressed()) {
-            return builder.withVersionCompressed(version)
-                    .forPath(resolveForSet(model), 
modelSpec.serializer().serialize(model));
+            builder2 = builder.withVersionCompressed(version);
+        } else {
+            builder2 = builder.withVersionUncompressed(version);
         }
-        return builder.withVersion(version)
-                .forPath(resolveForSet(model), 
modelSpec.serializer().serialize(model));
+        return builder2.forPath(resolveForSet(model), 
modelSpec.serializer().serialize(model));
     }
 
     @Override
@@ -408,14 +409,23 @@ public class ModeledFrameworkImpl<T> implements 
ModeledFramework<T> {
     }
 
     private boolean isCompressed() {
-        return modelSpec.createOptions().contains(CreateOption.compress);
+        if (modelSpec.createOptions().contains(CreateOption.compress)) {
+            return true;
+        } else if 
(modelSpec.createOptions().contains(CreateOption.uncompress)) {
+            return false;
+        } else {
+            return client.unwrap().compressionEnabled();
+        }
     }
 
     private <U> ModelStage<U> internalRead(Function<ZNode<T>, U> resolver, 
Stat storingStatIn) {
         Stat stat = (storingStatIn != null) ? storingStatIn : new Stat();
-        AsyncPathable<AsyncStage<byte[]>> next = isCompressed()
-                ? watchableClient.getData().decompressedStoringStatIn(stat)
-                : watchableClient.getData().storingStatIn(stat);
+        AsyncPathable<AsyncStage<byte[]>> next;
+        if (isCompressed()) {
+            next = watchableClient.getData().decompressedStoringStatIn(stat);
+        } else {
+            next = watchableClient.getData().undecompressedStoringStatIn(stat);
+        }
         AsyncStage<byte[]> asyncStage = 
next.forPath(modelSpec.path().fullPath());
         ModelStage<U> modelStage = ModelStage.make(asyncStage.event());
         asyncStage.whenComplete((value, e) -> {
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
index 1ee7d7334..78f26fa84 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java
@@ -26,11 +26,13 @@ import static 
org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
 import static org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import org.apache.curator.framework.CuratorFramework;
@@ -38,6 +40,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.api.CreateOption;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
@@ -200,4 +203,185 @@ public class TestBasicOperations extends 
CompletableBaseClassForTests {
         complete(client.getData().storingStatIn(stat).forPath("/test"));
         assertEquals(stat.getDataLength(), "hey".length());
     }
+
+    @Test
+    public void testCompression() {
+        // Test create
+        byte[] data = "hey".getBytes();
+        
complete(client.create().withOptions(Collections.singleton(compress)).forPath("/test",
 data));
+
+        Stat stat = new Stat();
+        
complete(client.getData().decompressedStoringStatIn(stat).forPath("/test"), (v, 
e) -> {
+            assertNull(e);
+            assertEquals(data.length, v.length);
+        });
+        assertNotEquals(data.length, stat.getDataLength());
+        complete(client.getData().storingStatIn(stat).forPath("/test"), (v, e) 
-> {
+            assertNull(e);
+            assertNotEquals(data.length, v.length);
+        });
+        assertNotEquals(data.length, stat.getDataLength());
+        
complete(client.getData().undecompressedStoringStatIn(stat).forPath("/test"), 
(v, e) -> {
+            assertNull(e);
+            assertNotEquals(data.length, v.length);
+        });
+        assertNotEquals(data.length, stat.getDataLength());
+
+        // Test setData
+        byte[] data2 = "hey2".getBytes();
+        complete(client.setData().compressed().forPath("/test", data2));
+
+        stat = new Stat();
+        
complete(client.getData().decompressedStoringStatIn(stat).forPath("/test"), (v, 
e) -> {
+            assertNull(e);
+            assertEquals(data2.length, v.length);
+        });
+        assertNotEquals(data2.length, stat.getDataLength());
+        complete(client.getData().storingStatIn(stat).forPath("/test"), (v, e) 
-> {
+            assertNull(e);
+            assertNotEquals(data2.length, v.length);
+        });
+        assertNotEquals(data2.length, stat.getDataLength());
+        
complete(client.getData().undecompressedStoringStatIn(stat).forPath("/test"), 
(v, e) -> {
+            assertNull(e);
+            assertNotEquals(data2.length, v.length);
+        });
+        assertNotEquals(data2.length, stat.getDataLength());
+    }
+
+    @Test
+    public void testGlobalCompression() throws Exception {
+        try (CuratorFramework syncClient = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new 
RetryOneTime(timing.forSleepingABit().milliseconds()))
+                .enableCompression()
+                .build()) {
+            syncClient.start();
+            AsyncCuratorFramework client = 
AsyncCuratorFramework.wrap(syncClient);
+
+            Stat stat = new Stat();
+            byte[] data = "hey".getBytes();
+            byte[] data2 = "hey2".getBytes();
+            String path = "/test";
+
+            // Test create with explicit compression
+            complete(
+                    
client.create().withOptions(Collections.singleton(compress)).forPath(path, 
data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertNotEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+
+            // Test create explicitly without compression
+            syncClient.delete().forPath(path);
+            complete(client.create()
+                    
.withOptions(Collections.singleton(CreateOption.uncompress))
+                    .forPath(path, data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            assertEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            assertEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertEquals(data.length, stat.getDataLength());
+
+            // Test create with implicit (global) compression
+            syncClient.delete().forPath(path);
+            complete(client.create().forPath(path, data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertNotEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+
+            // Test setData with explicit compression
+            complete(client.setData().compressed().forPath(path, data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertNotEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+
+            // Test setData explicitly without compression
+            complete(client.setData().uncompressed().forPath(path, data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            assertEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            assertEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertEquals(data.length, stat.getDataLength());
+
+            // Test setData with implicit (global) compression
+            complete(client.setData().forPath(path, data));
+
+            
complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) 
-> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            complete(client.getData().storingStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+            
complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, 
e) -> {
+                assertNull(e);
+                assertNotEquals(data.length, v.length);
+            });
+            assertNotEquals(data.length, stat.getDataLength());
+        }
+    }
 }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
index 768c90128..967b04744 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java
@@ -300,4 +300,442 @@ public class TestModeledFramework extends 
TestModeledFrameworkBase {
             assertTrue(e.getCause() instanceof 
KeeperException.NoAuthException);
         }
     }
+
+    @Test
+    public void testCompressedCreateAndRead() throws Exception {
+        try (CuratorFramework compressedRawClient =
+                createRawClientBuilder().enableCompression().build()) {
+            compressedRawClient.start();
+            AsyncCuratorFramework compressedAsync = 
AsyncCuratorFramework.wrap(compressedRawClient);
+            TestModel rawModel = new TestModel("John", "Galt", "1 Galt's 
Gulch", 42, BigInteger.valueOf(1));
+
+            // These should be compressed
+            ModeledFramework<TestModel> clientWithCompressedFramework =
+                    ModeledFramework.wrap(compressedAsync, modelSpec);
+            ModeledFramework<TestModel> clientWithCompressedModel = 
ModeledFramework.wrap(async, compressedModelSpec);
+
+            // These should be uncompressed
+            ModeledFramework<TestModel> client = ModeledFramework.wrap(async, 
modelSpec);
+            ModeledFramework<TestModel> clientWithUncompressedModel =
+                    ModeledFramework.wrap(async, uncompressedModelSpec);
+            ModeledFramework<TestModel> 
clientWithCompressedFrameworkAndUncompressedModel =
+                    ModeledFramework.wrap(compressedAsync, 
uncompressedModelSpec);
+
+            // Create with compressedFramework, read with all other clients
+            complete(clientWithCompressedFramework.set(rawModel), (path, e) -> 
assertNull(e));
+            complete(clientWithCompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Create with compressedModel, read with all other clients
+            clientWithCompressedModel.delete();
+            complete(clientWithCompressedModel.set(rawModel), (path, e) -> 
assertNull(e));
+            complete(clientWithCompressedFramework.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Create with regular (implicitly uncompressed) client, read with 
all other clients
+            client.delete();
+            complete(client.set(rawModel), (path, e) -> assertNull(e));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Create with uncompressedModel, read with all other clients
+            clientWithUncompressedModel.delete();
+            complete(clientWithUncompressedModel.set(rawModel), (path, e) -> 
assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Create with compressedFramework overriden by an 
uncompressedModel, read with all other clients
+            clientWithCompressedFrameworkAndUncompressedModel.delete();
+            
complete(clientWithCompressedFrameworkAndUncompressedModel.set(rawModel), 
(path, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            clientWithCompressedFrameworkAndUncompressedModel.delete();
+        }
+    }
+
+    @Test
+    public void testCompressedUpdateAndRead() throws Exception {
+        try (CuratorFramework compressedRawClient =
+                createRawClientBuilder().enableCompression().build()) {
+            compressedRawClient.start();
+            AsyncCuratorFramework compressedAsync = 
AsyncCuratorFramework.wrap(compressedRawClient);
+            TestModel rawModel = new TestModel("John", "Galt", "1 Galt's 
Gulch", 42, BigInteger.valueOf(1));
+
+            // These should be compressed
+            ModeledFramework<TestModel> clientWithCompressedFramework =
+                    ModeledFramework.wrap(compressedAsync, modelSpec);
+            ModeledFramework<TestModel> clientWithCompressedModel = 
ModeledFramework.wrap(async, compressedModelSpec);
+
+            // These should be uncompressed
+            ModeledFramework<TestModel> client = ModeledFramework.wrap(async, 
modelSpec);
+            ModeledFramework<TestModel> clientWithUncompressedModel =
+                    ModeledFramework.wrap(async, uncompressedModelSpec);
+            ModeledFramework<TestModel> 
clientWithCompressedFrameworkAndUncompressedModel =
+                    ModeledFramework.wrap(compressedAsync, 
uncompressedModelSpec);
+
+            // Create the node - so we can update in each command
+            complete(client.set(rawModel), (model, e) -> assertNull(e));
+
+            // Update with compressedFramework, read with all other clients
+            complete(clientWithCompressedFramework.update(rawModel), (stat, e) 
-> assertNull(e));
+            complete(clientWithCompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Update with compressedModel, read with all other clients
+            complete(clientWithCompressedModel.update(rawModel), (stat, e) -> 
assertNull(e));
+            complete(clientWithCompressedFramework.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Update with regular (implicitly uncompressed) client, read with 
all other clients
+            complete(client.update(rawModel), (stat, e) -> assertNull(e));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Update with uncompressedModel, read with all other clients
+            complete(clientWithUncompressedModel.update(rawModel), (stat, e) 
-> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Update with compressedFramework overriden by an 
uncompressedModel, read with all other clients
+            
complete(clientWithCompressedFrameworkAndUncompressedModel.update(rawModel), 
(stat, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+        }
+    }
+
+    @Test
+    public void testCompressedCreateOp() throws Exception {
+        try (CuratorFramework compressedRawClient =
+                createRawClientBuilder().enableCompression().build()) {
+            compressedRawClient.start();
+            AsyncCuratorFramework compressedAsync = 
AsyncCuratorFramework.wrap(compressedRawClient);
+            TestModel rawModel = new TestModel("John", "Galt", "1 Galt's 
Gulch", 42, BigInteger.valueOf(1));
+
+            // These should be compressed
+            ModeledFramework<TestModel> clientWithCompressedFramework =
+                    ModeledFramework.wrap(compressedAsync, modelSpec);
+            ModeledFramework<TestModel> clientWithCompressedModel = 
ModeledFramework.wrap(async, compressedModelSpec);
+
+            // These should be uncompressed
+            ModeledFramework<TestModel> client = ModeledFramework.wrap(async, 
modelSpec);
+            ModeledFramework<TestModel> clientWithUncompressedModel =
+                    ModeledFramework.wrap(async, uncompressedModelSpec);
+            ModeledFramework<TestModel> 
clientWithCompressedFrameworkAndUncompressedModel =
+                    ModeledFramework.wrap(compressedAsync, 
uncompressedModelSpec);
+
+            // Make sure the parent node(s) exist
+            rawClient
+                    .create()
+                    .creatingParentsIfNeeded()
+                    .forPath(modelSpec.path().parent().fullPath());
+
+            // Create with compressedFramework, read with all other clients
+            complete(
+                    clientWithCompressedFramework.inTransaction(
+                            
Collections.singletonList(clientWithCompressedFramework.createOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithCompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Create with compressedModel, read with all other clients
+            clientWithCompressedModel.delete();
+            complete(
+                    clientWithCompressedModel.inTransaction(
+                            
Collections.singletonList(clientWithCompressedModel.createOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithCompressedFramework.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Create with regular (implicitly uncompressed) client, read with 
all other clients
+            client.delete();
+            complete(
+                    
client.inTransaction(Collections.singletonList(client.createOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Create with uncompressedModel, read with all other clients
+            clientWithUncompressedModel.delete();
+            complete(
+                    clientWithUncompressedModel.inTransaction(
+                            
Collections.singletonList(clientWithUncompressedModel.createOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Create with compressedFramework overriden by an 
uncompressedModel, read with all other clients
+            clientWithCompressedFrameworkAndUncompressedModel.delete();
+            complete(
+                    
clientWithCompressedFrameworkAndUncompressedModel.inTransaction(Collections.singletonList(
+                            
clientWithCompressedFrameworkAndUncompressedModel.createOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            clientWithCompressedFrameworkAndUncompressedModel.delete();
+        }
+    }
+
+    @Test
+    public void testCompressedUpdateOp() throws Exception {
+        try (CuratorFramework compressedRawClient =
+                createRawClientBuilder().enableCompression().build()) {
+            compressedRawClient.start();
+            AsyncCuratorFramework compressedAsync = 
AsyncCuratorFramework.wrap(compressedRawClient);
+            TestModel rawModel = new TestModel("John", "Galt", "1 Galt's 
Gulch", 42, BigInteger.valueOf(1));
+
+            // These should be compressed
+            ModeledFramework<TestModel> clientWithCompressedFramework =
+                    ModeledFramework.wrap(compressedAsync, modelSpec);
+            ModeledFramework<TestModel> clientWithCompressedModel = 
ModeledFramework.wrap(async, compressedModelSpec);
+
+            // These should be uncompressed
+            ModeledFramework<TestModel> client = ModeledFramework.wrap(async, 
modelSpec);
+            ModeledFramework<TestModel> clientWithUncompressedModel =
+                    ModeledFramework.wrap(async, uncompressedModelSpec);
+            ModeledFramework<TestModel> 
clientWithCompressedFrameworkAndUncompressedModel =
+                    ModeledFramework.wrap(compressedAsync, 
uncompressedModelSpec);
+
+            // Create the node - so we can update in each command
+            complete(client.set(rawModel), (model, e) -> assertNull(e));
+
+            // Update with compressedFramework, read with all other clients
+            complete(
+                    clientWithCompressedFramework.inTransaction(
+                            
Collections.singletonList(clientWithCompressedFramework.updateOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithCompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Update with compressedModel, read with all other clients
+            complete(
+                    clientWithCompressedModel.inTransaction(
+                            
Collections.singletonList(clientWithCompressedModel.updateOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithCompressedFramework.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(client.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithUncompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+            complete(clientWithCompressedFrameworkAndUncompressedModel.read(), 
(model, e) -> {
+                assertNotNull(e);
+                assertEquals(RuntimeException.class, e.getClass());
+            });
+
+            // Update with regular (implicitly uncompressed) client, read with 
all other clients
+            complete(
+                    
client.inTransaction(Collections.singletonList(client.updateOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Update with uncompressedModel, read with all other clients
+            complete(
+                    clientWithUncompressedModel.inTransaction(
+                            
Collections.singletonList(clientWithUncompressedModel.updateOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(
+                    clientWithCompressedFrameworkAndUncompressedModel.read(),
+                    (model, e) -> assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+
+            // Update with compressedFramework overriden by an 
uncompressedModel, read with all other clients
+            complete(
+                    
clientWithCompressedFrameworkAndUncompressedModel.inTransaction(Collections.singletonList(
+                            
clientWithCompressedFrameworkAndUncompressedModel.updateOp(rawModel))),
+                    (results, e) -> assertNull(e));
+            complete(client.read(), (model, e) -> assertEquals(model, 
rawModel));
+            complete(clientWithUncompressedModel.read(), (model, e) -> 
assertEquals(model, rawModel));
+            complete(clientWithCompressedModel.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+            complete(clientWithCompressedFramework.read(), (model, e) -> {
+                assertNotNull(e);
+                assertEquals(KeeperException.DataInconsistencyException.class, 
e.getClass());
+            });
+        }
+    }
 }
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
index 2941ef33a..a33a1d5ec 100644
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java
@@ -19,12 +19,14 @@
 
 package org.apache.curator.x.async.modeled;
 
+import java.util.Collections;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.modeled.models.TestModel;
 import org.apache.curator.x.async.modeled.models.TestNewerModel;
 import org.junit.jupiter.api.AfterEach;
@@ -35,15 +37,24 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests {
     protected CuratorFramework rawClient;
     protected ModelSpec<TestModel> modelSpec;
     protected ModelSpec<TestNewerModel> newModelSpec;
+    protected ModelSpec<TestModel> compressedModelSpec;
+    protected ModelSpec<TestModel> uncompressedModelSpec;
     protected AsyncCuratorFramework async;
 
+    public CuratorFrameworkFactory.Builder createRawClientBuilder() {
+        return CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .retryPolicy(new RetryOneTime(1))
+                .sessionTimeoutMs(timing.session())
+                .connectionTimeoutMs(timing.connection());
+    }
+
     @BeforeEach
     @Override
     public void setup() throws Exception {
         super.setup();
 
-        rawClient = CuratorFrameworkFactory.newClient(
-                server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(1));
+        rawClient = createRawClientBuilder().build();
         rawClient.start();
         async = AsyncCuratorFramework.wrap(rawClient);
 
@@ -52,6 +63,12 @@ public class TestModeledFrameworkBase extends 
CompletableBaseClassForTests {
 
         modelSpec = ModelSpec.builder(path, serializer).build();
         newModelSpec = ModelSpec.builder(path, newSerializer).build();
+        compressedModelSpec = ModelSpec.builder(path, serializer)
+                
.withCreateOptions(Collections.singleton(CreateOption.compress))
+                .build();
+        uncompressedModelSpec = ModelSpec.builder(path, serializer)
+                
.withCreateOptions(Collections.singleton(CreateOption.uncompress))
+                .build();
     }
 
     @AfterEach

Reply via email to