This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit c171830a61af71fbcadc3d4ba2628643b179e097
Author: Prashant Singh <[email protected]>
AuthorDate: Fri Sep 2 04:07:16 2022 +0530

    Core, AWS: Fix Kryo serialization failure for FileIO (#5437)
---
 .../test/java/org/apache/iceberg/TestHelpers.java  | 35 +++++++++++++++
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   | 11 ++---
 .../org/apache/iceberg/aws/s3/TestS3FileIO.java    | 23 ++++++++++
 build.gradle                                       |  4 ++
 .../org/apache/iceberg/io/ResolvingFileIO.java     |  7 +--
 .../apache/iceberg/hadoop/HadoopFileIOTest.java    | 26 +++++++++++
 .../org/apache/iceberg/io/TestResolvingIO.java     | 52 ++++++++++++++++++++++
 .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 20 ++++++---
 .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java  | 26 +++++++++++
 versions.props                                     |  1 +
 10 files changed, 190 insertions(+), 15 deletions(-)

diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java 
b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index f82269060e..77e7acf2f6 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -19,11 +19,16 @@
 
 package org.apache.iceberg;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.ClosureSerializer;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.invoke.SerializedLambda;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +41,7 @@ import org.apache.iceberg.expressions.ExpressionVisitors;
 import org.apache.iceberg.expressions.UnboundPredicate;
 import org.apache.iceberg.util.ByteBuffers;
 import org.junit.Assert;
+import org.objenesis.strategy.StdInstantiatorStrategy;
 
 public class TestHelpers {
 
@@ -147,6 +153,35 @@ public class TestHelpers {
     });
   }
 
+  public static class KryoHelpers {
+    private KryoHelpers() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> T roundTripSerialize(T obj) throws IOException {
+      Kryo kryo = new Kryo();
+
+      // required for avoiding requirement of zero arg constructor
+      kryo.setInstantiatorStrategy(
+          new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
+
+      // required for serializing and deserializing $$Lambda$ Anonymous Classes
+      kryo.register(SerializedLambda.class);
+      kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer());
+
+      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+
+      try (Output out = new Output(new ObjectOutputStream(bytes))) {
+        kryo.writeClassAndObject(out, obj);
+      }
+
+      try (Input in =
+          new Input(new ObjectInputStream(new 
ByteArrayInputStream(bytes.toByteArray())))) {
+        return (T) kryo.readClassAndObject(in);
+      }
+    }
+  }
+
   private static class CheckReferencesBound extends 
ExpressionVisitors.ExpressionVisitor<Void> {
     private final String message;
 
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 5d5d88eafb..917b25af48 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -45,6 +45,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.apache.iceberg.util.Tasks;
 import org.apache.iceberg.util.ThreadPools;
@@ -79,7 +80,7 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
   private String credential = null;
   private SerializableSupplier<S3Client> s3;
   private AwsProperties awsProperties;
-  private Map<String, String> properties = null;
+  private SerializableMap<String, String> properties = null;
   private transient volatile S3Client client;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
@@ -152,7 +153,7 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   /**
@@ -314,8 +315,8 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
 
   @Override
   public void initialize(Map<String, String> props) {
-    this.awsProperties = new AwsProperties(props);
-    this.properties = props;
+    this.properties = SerializableMap.copyOf(props);
+    this.awsProperties = new AwsProperties(properties);
 
     // Do not override s3 client if it was provided
     if (s3 == null) {
@@ -334,7 +335,7 @@ public class S3FileIO implements FileIO, 
SupportsBulkOperations, SupportsPrefixO
               .hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
               .buildChecked();
       MetricsContext context = ctor.newInstance("s3");
-      context.initialize(props);
+      context.initialize(properties);
       this.metrics = context;
     } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException 
e) {
       LOG.warn("Unable to load metrics class: '{}', falling back to null 
metrics", DEFAULT_METRICS_IMPL, e);
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
index 972df9da89..4d4d06c84c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.io.BulkDeletionFailureException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.FileIOParser;
@@ -244,6 +245,28 @@ public class TestS3FileIO {
     }
   }
 
+  @Test
+  public void testS3FileIOKryoSerialization() throws IOException {
+    FileIO testS3FileIO = new S3FileIO();
+
+    // s3 fileIO should be serializable when properties are passed as 
immutable map
+    testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.KryoHelpers.roundTripSerialize(testS3FileIO);
+
+    Assert.assertEquals(testS3FileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testS3FileIOJavaSerialization() throws IOException, 
ClassNotFoundException {
+    FileIO testS3FileIO = new S3FileIO();
+
+    // s3 fileIO should be serializable when properties are passed as 
immutable map
+    testS3FileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.roundTripSerialize(testS3FileIO);
+
+    Assert.assertEquals(testS3FileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
   private void createRandomObjects(String prefix, int count) {
     S3URI s3URI = new S3URI(prefix);
 
diff --git a/build.gradle b/build.gradle
index d9a6effddc..3a6468ba3c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -222,6 +222,7 @@ project(':iceberg-api') {
     implementation project(path: ':iceberg-bundled-guava', configuration: 
'shadow')
     compileOnly "com.google.errorprone:error_prone_annotations"
     testImplementation "org.apache.avro:avro"
+    testImplementation "com.esotericsoftware:kryo"
   }
 
   tasks.processTestResources.dependsOn rootProject.tasks.buildInfo
@@ -269,6 +270,7 @@ project(':iceberg-core') {
     testImplementation 'org.mock-server:mockserver-client-java'
     testImplementation "org.xerial:sqlite-jdbc"
     testImplementation project(path: ':iceberg-api', configuration: 
'testArtifacts')
+    testImplementation "com.esotericsoftware:kryo"
   }
 }
 
@@ -381,6 +383,7 @@ project(':iceberg-aws') {
       exclude module: "logback-classic"
       exclude group: 'junit'
     }
+    testImplementation "com.esotericsoftware:kryo"
   }
 
   sourceSets {
@@ -423,6 +426,7 @@ project(':iceberg-gcp') {
       exclude group: 'javax.servlet', module: 'servlet-api'
       exclude group: 'com.google.code.gson', module: 'gson'
     }
+    testImplementation "com.esotericsoftware:kryo"
   }
 }
 
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java 
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index 3815d5da54..8a29cadf27 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
   );
 
   private final Map<String, FileIO> ioInstances = Maps.newHashMap();
-  private Map<String, String> properties;
+  private SerializableMap<String, String> properties;
   private SerializableSupplier<Configuration> hadoopConf;
 
   /**
@@ -80,13 +81,13 @@ public class ResolvingFileIO implements FileIO, 
HadoopConfigurable {
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   @Override
   public void initialize(Map<String, String> newProperties) {
     close(); // close and discard any existing FileIO instances
-    this.properties = newProperties;
+    this.properties = SerializableMap.copyOf(newProperties);
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java 
b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
index 0721d69997..12de675dd5 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java
@@ -27,8 +27,12 @@ import java.util.Random;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -91,6 +95,28 @@ public class HadoopFileIOTest {
     assertThrows(UncheckedIOException.class, () -> 
hadoopFileIO.listPrefix(parent.toUri().toString()).iterator());
   }
 
+  @Test
+  public void testHadoopFileIOKryoSerialization() throws IOException {
+    FileIO testHadoopFileIO = new HadoopFileIO();
+
+    // hadoop fileIO should be serializable when properties are passed as 
immutable map
+    testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.KryoHelpers.roundTripSerialize(testHadoopFileIO);
+
+    Assert.assertEquals(testHadoopFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testHadoopFileIOJavaSerialization() throws IOException, 
ClassNotFoundException {
+    FileIO testHadoopFileIO = new HadoopFileIO();
+
+    // hadoop fileIO should be serializable when properties are passed as 
immutable map
+    testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.roundTripSerialize(testHadoopFileIO);
+
+    Assert.assertEquals(testHadoopFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
   private void createRandomFiles(Path parent, int count) {
     random.ints(count).parallel().forEach(i -> {
           try {
diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java 
b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
new file mode 100644
index 0000000000..e79c31db1b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.io;
+
+import java.io.IOException;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestResolvingIO {
+
+  @Test
+  public void testResolvingFileIOKryoSerialization() throws IOException {
+    FileIO testResolvingFileIO = new ResolvingFileIO();
+
+    // resolving fileIO should be serializable when properties are passed as 
immutable map
+    testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO =
+        TestHelpers.KryoHelpers.roundTripSerialize(testResolvingFileIO);
+
+    Assert.assertEquals(testResolvingFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testResolvingFileIOJavaSerialization() throws IOException, 
ClassNotFoundException {
+    FileIO testResolvingFileIO = new ResolvingFileIO();
+
+    // resolving fileIO should be serializable when properties are passed as 
immutable map
+    testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.roundTripSerialize(testResolvingFileIO);
+
+    Assert.assertEquals(testResolvingFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+}
diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java 
b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
index ecb520f1d2..7bc37edc92 100644
--- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
+++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.metrics.MetricsContext;
+import org.apache.iceberg.util.SerializableMap;
 import org.apache.iceberg.util.SerializableSupplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +53,7 @@ public class GCSFileIO implements FileIO {
   private transient volatile Storage storage;
   private MetricsContext metrics = MetricsContext.nullMetrics();
   private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
-  private Map<String, String> properties = null;
+  private SerializableMap<String, String> properties = null;
 
   /**
    * No-arg constructor to load the FileIO dynamically.
@@ -102,7 +103,7 @@ public class GCSFileIO implements FileIO {
 
   @Override
   public Map<String, String> properties() {
-    return properties;
+    return properties.immutableMap();
   }
 
   private Storage client() {
@@ -118,8 +119,8 @@ public class GCSFileIO implements FileIO {
 
   @Override
   public void initialize(Map<String, String> props) {
-    this.properties = props;
-    this.gcpProperties = new GCPProperties(props);
+    this.properties = SerializableMap.copyOf(props);
+    this.gcpProperties = new GCPProperties(properties);
 
     this.storageSupplier = () -> {
       StorageOptions.Builder builder = StorageOptions.newBuilder();
@@ -131,12 +132,17 @@ public class GCSFileIO implements FileIO {
       // Report Hadoop metrics if Hadoop is available
       try {
         DynConstructors.Ctor<MetricsContext> ctor =
-            
DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, 
String.class).buildChecked();
+            DynConstructors.builder(MetricsContext.class)
+                .hiddenImpl(DEFAULT_METRICS_IMPL, String.class)
+                .buildChecked();
         MetricsContext context = ctor.newInstance("gcs");
-        context.initialize(props);
+        context.initialize(properties);
         this.metrics = context;
       } catch (NoClassDefFoundError | NoSuchMethodException | 
ClassCastException e) {
-        LOG.warn("Unable to load metrics class: '{}', falling back to null 
metrics", DEFAULT_METRICS_IMPL, e);
+        LOG.warn(
+            "Unable to load metrics class: '{}', falling back to null metrics",
+            DEFAULT_METRICS_IMPL,
+            e);
       }
 
       return builder.build().getService();
diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java 
b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
index 99bb08d4bf..2d31d74c6d 100644
--- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
+++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java
@@ -28,9 +28,13 @@ import java.io.OutputStream;
 import java.util.Random;
 import java.util.stream.StreamSupport;
 import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -93,4 +97,26 @@ public class GCSFileIOTest {
     
assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(),
 false).count())
         .isZero();
   }
+
+  @Test
+  public void testGCSFileIOKryoSerialization() throws IOException {
+    FileIO testGCSFileIO = new GCSFileIO();
+
+    // gcs fileIO should be serializable when properties are passed as 
immutable map
+    testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.KryoHelpers.roundTripSerialize(testGCSFileIO);
+
+    Assert.assertEquals(testGCSFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
+
+  @Test
+  public void testGCSFileIOJavaSerialization() throws IOException, 
ClassNotFoundException {
+    FileIO testGCSFileIO = new GCSFileIO();
+
+    // gcs fileIO should be serializable when properties are passed as 
immutable map
+    testGCSFileIO.initialize(ImmutableMap.of("k1", "v1"));
+    FileIO roundTripSerializedFileIO = 
TestHelpers.roundTripSerialize(testGCSFileIO);
+
+    Assert.assertEquals(testGCSFileIO.properties(), 
roundTripSerializedFileIO.properties());
+  }
 }
diff --git a/versions.props b/versions.props
index 69a08ca22b..c6e157193b 100644
--- a/versions.props
+++ b/versions.props
@@ -44,3 +44,4 @@ org.springframework:* = 5.3.9
 org.springframework.boot:* = 2.5.4
 org.mock-server:mockserver-netty = 5.11.1
 org.mock-server:mockserver-client-java = 5.11.1
+com.esotericsoftware:kryo = 4.0.2

Reply via email to