This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.14.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit c171830a61af71fbcadc3d4ba2628643b179e097 Author: Prashant Singh <[email protected]> AuthorDate: Fri Sep 2 04:07:16 2022 +0530 Core, AWS: Fix Kryo serialization failure for FileIO (#5437) --- .../test/java/org/apache/iceberg/TestHelpers.java | 35 +++++++++++++++ .../java/org/apache/iceberg/aws/s3/S3FileIO.java | 11 ++--- .../org/apache/iceberg/aws/s3/TestS3FileIO.java | 23 ++++++++++ build.gradle | 4 ++ .../org/apache/iceberg/io/ResolvingFileIO.java | 7 +-- .../apache/iceberg/hadoop/HadoopFileIOTest.java | 26 +++++++++++ .../org/apache/iceberg/io/TestResolvingIO.java | 52 ++++++++++++++++++++++ .../java/org/apache/iceberg/gcp/gcs/GCSFileIO.java | 20 ++++++--- .../org/apache/iceberg/gcp/gcs/GCSFileIOTest.java | 26 +++++++++++ versions.props | 1 + 10 files changed, 190 insertions(+), 15 deletions(-) diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index f82269060e..77e7acf2f6 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -19,11 +19,16 @@ package org.apache.iceberg; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.ClosureSerializer; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.invoke.SerializedLambda; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; @@ -36,6 +41,7 @@ import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.util.ByteBuffers; import org.junit.Assert; +import org.objenesis.strategy.StdInstantiatorStrategy; public class TestHelpers { @@ -147,6 +153,35 @@ public class TestHelpers { }); } + public static class KryoHelpers { + private KryoHelpers() { + } + + @SuppressWarnings("unchecked") + public static <T> T roundTripSerialize(T obj) throws IOException { + Kryo kryo = new Kryo(); + + // required for avoiding requirement of zero arg constructor + kryo.setInstantiatorStrategy( + new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + + // required for serializing and deserializing $$Lambda$ Anonymous Classes + kryo.register(SerializedLambda.class); + kryo.register(ClosureSerializer.Closure.class, new ClosureSerializer()); + + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + + try (Output out = new Output(new ObjectOutputStream(bytes))) { + kryo.writeClassAndObject(out, obj); + } + + try (Input in = + new Input(new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())))) { + return (T) kryo.readClassAndObject(in); + } + } + } + private static class CheckReferencesBound extends ExpressionVisitors.ExpressionVisitor<Void> { private final String message; diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java index 5d5d88eafb..917b25af48 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java @@ -45,6 +45,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; import org.apache.iceberg.relocated.com.google.common.collect.SetMultimap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; @@ -79,7 +80,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO private String credential = null; private SerializableSupplier<S3Client> s3; private AwsProperties awsProperties; - private Map<String, String> properties = null; + private SerializableMap<String, String> properties = null; private transient volatile S3Client client; private MetricsContext metrics = MetricsContext.nullMetrics(); private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); @@ -152,7 +153,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO @Override public Map<String, String> properties() { - return properties; + return properties.immutableMap(); } /** @@ -314,8 +315,8 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO @Override public void initialize(Map<String, String> props) { - this.awsProperties = new AwsProperties(props); - this.properties = props; + this.properties = SerializableMap.copyOf(props); + this.awsProperties = new AwsProperties(properties); // Do not override s3 client if it was provided if (s3 == null) { @@ -334,7 +335,7 @@ public class S3FileIO implements FileIO, SupportsBulkOperations, SupportsPrefixO .hiddenImpl(DEFAULT_METRICS_IMPL, String.class) .buildChecked(); MetricsContext context = ctor.newInstance("s3"); - context.initialize(props); + context.initialize(properties); this.metrics = context; } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) { LOG.warn("Unable to load metrics class: '{}', falling back to null metrics", DEFAULT_METRICS_IMPL, e); diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java index 972df9da89..4d4d06c84c 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java @@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.conf.Configurable; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.FileIOParser; @@ -244,6 +245,28 @@ public class TestS3FileIO { } } + @Test + public void testS3FileIOKryoSerialization() throws IOException { + FileIO testS3FileIO = new S3FileIO(); + + // s3 fileIO should be serializable when properties are passed as immutable map + testS3FileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testS3FileIO); + + Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties()); + } + + @Test + public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundException { + FileIO testS3FileIO = new S3FileIO(); + + // s3 fileIO should be serializable when properties are passed as immutable map + testS3FileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testS3FileIO); + + Assert.assertEquals(testS3FileIO.properties(), roundTripSerializedFileIO.properties()); + } + private void createRandomObjects(String prefix, int count) { S3URI s3URI = new S3URI(prefix); diff --git a/build.gradle b/build.gradle index d9a6effddc..3a6468ba3c 100644 --- a/build.gradle +++ b/build.gradle @@ -222,6 +222,7 @@ project(':iceberg-api') { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') compileOnly "com.google.errorprone:error_prone_annotations" testImplementation "org.apache.avro:avro" + testImplementation "com.esotericsoftware:kryo" } tasks.processTestResources.dependsOn rootProject.tasks.buildInfo @@ -269,6 +270,7 @@ project(':iceberg-core') { testImplementation 'org.mock-server:mockserver-client-java' testImplementation "org.xerial:sqlite-jdbc" testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation "com.esotericsoftware:kryo" } } @@ -381,6 +383,7 @@ project(':iceberg-aws') { exclude module: "logback-classic" exclude group: 'junit' } + testImplementation "com.esotericsoftware:kryo" } sourceSets { @@ -423,6 +426,7 @@ project(':iceberg-gcp') { exclude group: 'javax.servlet', module: 'servlet-api' exclude group: 'com.google.code.gson', module: 'gson' } + testImplementation "com.esotericsoftware:kryo" } } diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java index 3815d5da54..8a29cadf27 100644 --- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java +++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java @@ -29,6 +29,7 @@ import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable { ); private final Map<String, FileIO> ioInstances = Maps.newHashMap(); - private Map<String, String> properties; + private SerializableMap<String, String> properties; private SerializableSupplier<Configuration> hadoopConf; /** @@ -80,13 +81,13 @@ public class ResolvingFileIO implements FileIO, HadoopConfigurable { @Override public Map<String, String> properties() { - return properties; + return properties.immutableMap(); } @Override public void initialize(Map<String, String> newProperties) { close(); // close and discard any existing FileIO instances - this.properties = newProperties; + this.properties = SerializableMap.copyOf(newProperties); } @Override diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java index 0721d69997..12de675dd5 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopFileIOTest.java @@ -27,8 +27,12 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Streams; +import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -91,6 +95,28 @@ public class HadoopFileIOTest { assertThrows(UncheckedIOException.class, () -> hadoopFileIO.listPrefix(parent.toUri().toString()).iterator()); } + @Test + public void testHadoopFileIOKryoSerialization() throws IOException { + FileIO testHadoopFileIO = new HadoopFileIO(); + + // hadoop fileIO should be serializable when properties are passed as immutable map + testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testHadoopFileIO); + + Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties()); + } + + @Test + public void testHadoopFileIOJavaSerialization() throws IOException, ClassNotFoundException { + FileIO testHadoopFileIO = new HadoopFileIO(); + + // hadoop fileIO should be serializable when properties are passed as immutable map + testHadoopFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testHadoopFileIO); + + Assert.assertEquals(testHadoopFileIO.properties(), roundTripSerializedFileIO.properties()); + } + private void createRandomFiles(Path parent, int count) { random.ints(count).parallel().forEach(i -> { try { diff --git a/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java new file mode 100644 index 0000000000..e79c31db1b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io; + +import java.io.IOException; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.Test; + +public class TestResolvingIO { + + @Test + public void testResolvingFileIOKryoSerialization() throws IOException { + FileIO testResolvingFileIO = new ResolvingFileIO(); + + // resolving fileIO should be serializable when properties are passed as immutable map + testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = + TestHelpers.KryoHelpers.roundTripSerialize(testResolvingFileIO); + + Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties()); + } + + @Test + public void testResolvingFileIOJavaSerialization() throws IOException, ClassNotFoundException { + FileIO testResolvingFileIO = new ResolvingFileIO(); + + // resolving fileIO should be serializable when properties are passed as immutable map + testResolvingFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testResolvingFileIO); + + Assert.assertEquals(testResolvingFileIO.properties(), roundTripSerializedFileIO.properties()); + } +} diff --git a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java index ecb520f1d2..7bc37edc92 100644 --- a/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java +++ b/gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSFileIO.java @@ -30,6 +30,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.util.SerializableMap; import org.apache.iceberg.util.SerializableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,7 @@ public class GCSFileIO implements FileIO { private transient volatile Storage storage; private MetricsContext metrics = MetricsContext.nullMetrics(); private final AtomicBoolean isResourceClosed = new AtomicBoolean(false); - private Map<String, String> properties = null; + private SerializableMap<String, String> properties = null; /** * No-arg constructor to load the FileIO dynamically. @@ -102,7 +103,7 @@ public class GCSFileIO implements FileIO { @Override public Map<String, String> properties() { - return properties; + return properties.immutableMap(); } private Storage client() { @@ -118,8 +119,8 @@ public class GCSFileIO implements FileIO { @Override public void initialize(Map<String, String> props) { - this.properties = props; - this.gcpProperties = new GCPProperties(props); + this.properties = SerializableMap.copyOf(props); + this.gcpProperties = new GCPProperties(properties); this.storageSupplier = () -> { StorageOptions.Builder builder = StorageOptions.newBuilder(); @@ -131,12 +132,17 @@ public class GCSFileIO implements FileIO { // Report Hadoop metrics if Hadoop is available try { DynConstructors.Ctor<MetricsContext> ctor = - DynConstructors.builder(MetricsContext.class).hiddenImpl(DEFAULT_METRICS_IMPL, String.class).buildChecked(); + DynConstructors.builder(MetricsContext.class) + .hiddenImpl(DEFAULT_METRICS_IMPL, String.class) + .buildChecked(); MetricsContext context = ctor.newInstance("gcs"); - context.initialize(props); + context.initialize(properties); this.metrics = context; } catch (NoClassDefFoundError | NoSuchMethodException | ClassCastException e) { - LOG.warn("Unable to load metrics class: '{}', falling back to null metrics", DEFAULT_METRICS_IMPL, e); + LOG.warn( + "Unable to load metrics class: '{}', falling back to null metrics", + DEFAULT_METRICS_IMPL, + e); } return builder.build().getService(); diff --git a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java index 99bb08d4bf..2d31d74c6d 100644 --- a/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java +++ b/gcp/src/test/java/org/apache/iceberg/gcp/gcs/GCSFileIOTest.java @@ -28,9 +28,13 @@ import java.io.OutputStream; import java.util.Random; import java.util.stream.StreamSupport; import org.apache.commons.io.IOUtils; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -93,4 +97,26 @@ public class GCSFileIOTest { assertThat(StreamSupport.stream(storage.list(TEST_BUCKET).iterateAll().spliterator(), false).count()) .isZero(); } + + @Test + public void testGCSFileIOKryoSerialization() throws IOException { + FileIO testGCSFileIO = new GCSFileIO(); + + // gcs fileIO should be serializable when properties are passed as immutable map + testGCSFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.KryoHelpers.roundTripSerialize(testGCSFileIO); + + Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties()); + } + + @Test + public void testGCSFileIOJavaSerialization() throws IOException, ClassNotFoundException { + FileIO testGCSFileIO = new GCSFileIO(); + + // gcs fileIO should be serializable when properties are passed as immutable map + testGCSFileIO.initialize(ImmutableMap.of("k1", "v1")); + FileIO roundTripSerializedFileIO = TestHelpers.roundTripSerialize(testGCSFileIO); + + Assert.assertEquals(testGCSFileIO.properties(), roundTripSerializedFileIO.properties()); + } } diff --git a/versions.props b/versions.props index 69a08ca22b..c6e157193b 100644 --- a/versions.props +++ b/versions.props @@ -44,3 +44,4 @@ org.springframework:* = 5.3.9 org.springframework.boot:* = 2.5.4 org.mock-server:mockserver-netty = 5.11.1 org.mock-server:mockserver-client-java = 5.11.1 +com.esotericsoftware:kryo = 4.0.2
