This is an automated email from the ASF dual-hosted git repository.

dpitkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e50097ba4 GH-43469: [Java] Change the default 
CompressionCodec.Factory to leverage compression support transparently (#43471)
7e50097ba4 is described below

commit 7e50097ba4239cf9368b77f438d877d4176141c9
Author: Costi Ciudatu <[email protected]>
AuthorDate: Tue Jul 30 22:30:57 2024 +0300

    GH-43469: [Java] Change the default CompressionCodec.Factory to leverage 
compression support transparently (#43471)
    
    
    
    ### Rationale for this change
    
    Add compression support to Flight RPC and others by just including the 
`arrow-compression` jar in the module path (or classpath).
    
    ### What changes are included in this PR?
    
    Change the default compression factory to the new 
`CompressionCodec.Factory.INSTANCE`, a ServiceLoader-backed singleton that 
delegates to the best suited available implementation in the module/class path 
for each codec type.
    
    ### Are these changes tested?
    
    yes
    
    ### Are there any user-facing changes?
    
    No.
    * GitHub Issue: #43469
    
    Authored-by: Costi Ciudatu <[email protected]>
    Signed-off-by: Dane Pitkin <[email protected]>
---
 java/compression/src/main/java/module-info.java    |  6 +++
 ...row.vector.compression.CompressionCodec$Factory | 15 +++++++
 .../TestCompressionCodecServiceProvider.java       | 50 ++++++++++++++++++++++
 java/vector/src/main/java/module-info.java         |  2 +
 .../java/org/apache/arrow/vector/VectorLoader.java |  2 +-
 .../arrow/vector/compression/CompressionCodec.java | 44 +++++++++++++++++++
 .../apache/arrow/vector/ipc/ArrowFileReader.java   |  3 +-
 .../org/apache/arrow/vector/ipc/ArrowReader.java   |  3 +-
 .../apache/arrow/vector/ipc/ArrowStreamReader.java |  3 +-
 9 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/java/compression/src/main/java/module-info.java 
b/java/compression/src/main/java/module-info.java
index 6bf989e4c1..113a1dba9d 100644
--- a/java/compression/src/main/java/module-info.java
+++ b/java/compression/src/main/java/module-info.java
@@ -15,6 +15,8 @@
  * limitations under the License.
  */
 
+import org.apache.arrow.vector.compression.CompressionCodec;
+
 module org.apache.arrow.compression {
   exports org.apache.arrow.compression;
 
@@ -22,4 +24,8 @@ module org.apache.arrow.compression {
   requires org.apache.arrow.memory.core;
   requires org.apache.arrow.vector;
   requires org.apache.commons.compress;
+
+  // Also defined under META-INF/services to support non-modular applications
+  provides CompressionCodec.Factory with
+      org.apache.arrow.compression.CommonsCompressionFactory;
 }
diff --git 
a/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
 
b/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
new file mode 100644
index 0000000000..ccdcef9aed
--- /dev/null
+++ 
b/java/compression/src/main/resources/META-INF/services/org.apache.arrow.vector.compression.CompressionCodec$Factory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.arrow.compression.CommonsCompressionFactory
diff --git 
a/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
 
b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
new file mode 100644
index 0000000000..795e05d7cb
--- /dev/null
+++ 
b/java/compression/src/test/java/org/apache/arrow/compression/TestCompressionCodecServiceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.arrow.compression;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+import org.junit.jupiter.api.Test;
+
+public class TestCompressionCodecServiceProvider {
+
+  /**
+   * When arrow-compression is in the classpath/module-path, {@link
+   * CompressionCodec.Factory#INSTANCE} should be able to handle all codec 
types.
+   */
+  @Test
+  public void testSupportedCompressionTypes() {
+    assertThrows( // no-compression doesn't support any actual compression 
types
+        IllegalArgumentException.class,
+        () -> checkAllCodecTypes(NoCompressionCodec.Factory.INSTANCE));
+    assertThrows( // commons-compression doesn't support the uncompressed type
+        IllegalArgumentException.class,
+        () -> checkAllCodecTypes(CommonsCompressionFactory.INSTANCE));
+    checkAllCodecTypes( // and the winner is...
+        CompressionCodec.Factory.INSTANCE); // combines the two above to 
support all types
+  }
+
+  private void checkAllCodecTypes(CompressionCodec.Factory factory) {
+    for (CompressionUtil.CodecType codecType : 
CompressionUtil.CodecType.values()) {
+      assertNotNull(factory.createCodec(codecType));
+    }
+  }
+}
diff --git a/java/vector/src/main/java/module-info.java 
b/java/vector/src/main/java/module-info.java
index 73af2d1b67..fdea2bd067 100644
--- a/java/vector/src/main/java/module-info.java
+++ b/java/vector/src/main/java/module-info.java
@@ -47,4 +47,6 @@ module org.apache.arrow.vector {
   requires org.apache.arrow.memory.core;
   requires org.apache.commons.codec;
   requires org.slf4j;
+
+  uses org.apache.arrow.vector.compression.CompressionCodec.Factory;
 }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
index c076161bc2..ecd3fb9124 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
@@ -50,7 +50,7 @@ public class VectorLoader {
    * @param root the root to add vectors to based on schema
    */
   public VectorLoader(VectorSchemaRoot root) {
-    this(root, NoCompressionCodec.Factory.INSTANCE);
+    this(root, CompressionCodec.Factory.INSTANCE);
   }
 
   /**
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
 
b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
index 2de8ff2465..dd62108a84 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionCodec.java
@@ -16,6 +16,9 @@
  */
 package org.apache.arrow.vector.compression;
 
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.ServiceLoader;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 
@@ -51,11 +54,52 @@ public interface CompressionCodec {
 
   /** Factory to create compression codec. */
   interface Factory {
+    /**
+     * This combines all the available factories registered as service 
providers in the module path.
+     * For each {@link CompressionUtil.CodecType compression codec type}, it 
will use whatever
+     * factory supports it, i.e. doesn't throw on `createCodec(type)`. If 
multiple factories
+     * registered as service providers support the same codec type, the first 
one encountered while
+     * iterating over the {@link ServiceLoader} will be selected. A codec type 
that is not supported
+     * by any registered service provider will fall back to {@link
+     * NoCompressionCodec.Factory#INSTANCE} for backwards compatibility.
+     */
+    Factory INSTANCE = bestEffort();
 
     /** Creates the codec based on the codec type. */
     CompressionCodec createCodec(CompressionUtil.CodecType codecType);
 
     /** Creates the codec based on the codec type and compression level. */
     CompressionCodec createCodec(CompressionUtil.CodecType codecType, int 
compressionLevel);
+
+    private static Factory bestEffort() {
+      final ServiceLoader<Factory> serviceLoader = 
ServiceLoader.load(Factory.class);
+      final Map<CompressionUtil.CodecType, Factory> factories =
+          new EnumMap<>(CompressionUtil.CodecType.class);
+      for (Factory factory : serviceLoader) {
+        for (CompressionUtil.CodecType codecType : 
CompressionUtil.CodecType.values()) {
+          try {
+            factory.createCodec(codecType); // will throw if not supported
+            factories.putIfAbsent(codecType, factory);
+          } catch (Throwable ignored) {
+          }
+        }
+      }
+
+      final Factory fallback = NoCompressionCodec.Factory.INSTANCE;
+      return new Factory() {
+        @Override
+        public CompressionCodec createCodec(CompressionUtil.CodecType 
codecType) {
+          return factories.getOrDefault(codecType, 
fallback).createCodec(codecType);
+        }
+
+        @Override
+        public CompressionCodec createCodec(
+            CompressionUtil.CodecType codecType, int compressionLevel) {
+          return factories
+              .getOrDefault(codecType, fallback)
+              .createCodec(codecType, compressionLevel);
+        }
+      };
+    }
   }
 }
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
index 982651b2ff..7cac0a15a1 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowFileReader.java
@@ -27,7 +27,6 @@ import org.apache.arrow.flatbuf.Footer;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.util.VisibleForTesting;
 import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
 import org.apache.arrow.vector.ipc.message.ArrowBlock;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowFooter;
@@ -64,7 +63,7 @@ public class ArrowFileReader extends ArrowReader {
   }
 
   public ArrowFileReader(SeekableReadChannel in, BufferAllocator allocator) {
-    this(in, allocator, NoCompressionCodec.Factory.INSTANCE);
+    this(in, allocator, CompressionCodec.Factory.INSTANCE);
   }
 
   public ArrowFileReader(SeekableByteChannel in, BufferAllocator allocator) {
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
index 15ade38cd3..7f4addf2d0 100644
--- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
+++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java
@@ -28,7 +28,6 @@ import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
 import org.apache.arrow.vector.dictionary.Dictionary;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
@@ -50,7 +49,7 @@ public abstract class ArrowReader implements 
DictionaryProvider, AutoCloseable {
   private final CompressionCodec.Factory compressionFactory;
 
   protected ArrowReader(BufferAllocator allocator) {
-    this(allocator, NoCompressionCodec.Factory.INSTANCE);
+    this(allocator, CompressionCodec.Factory.INSTANCE);
   }
 
   protected ArrowReader(BufferAllocator allocator, CompressionCodec.Factory 
compressionFactory) {
diff --git 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
index 660c6a5f89..69811dc717 100644
--- 
a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
+++ 
b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java
@@ -25,7 +25,6 @@ import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.compression.CompressionCodec;
-import org.apache.arrow.vector.compression.NoCompressionCodec;
 import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.ipc.message.MessageChannelReader;
@@ -65,7 +64,7 @@ public class ArrowStreamReader extends ArrowReader {
    * @param allocator to allocate new buffers
    */
   public ArrowStreamReader(MessageChannelReader messageReader, BufferAllocator 
allocator) {
-    this(messageReader, allocator, NoCompressionCodec.Factory.INSTANCE);
+    this(messageReader, allocator, CompressionCodec.Factory.INSTANCE);
   }
 
   /**

Reply via email to