This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ba403009d2 Core: Move Hadoop conf serialization into
SerializableConfiguration (#15583)
ba403009d2 is described below
commit ba403009d2636e1f422a865ebae083e77aa443ad
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Mar 13 15:40:14 2026 +0100
Core: Move Hadoop conf serialization into SerializableConfiguration (#15583)
---
.palantir/revapi.yml | 4 ++
.../java/org/apache/iceberg/SerializableTable.java | 40 +---------------
.../org/apache/iceberg/hadoop/HadoopFileIO.java | 15 ++++--
.../iceberg/hadoop/SerializableConfiguration.java | 37 ++++++++-------
.../org/apache/iceberg/io/ResolvingFileIO.java | 2 +-
.../org/apache/iceberg/util/SerializationUtil.java | 2 +-
.../hadoop/TestSerializableConfiguration.java | 55 ++++++++++++++++++++++
7 files changed, 93 insertions(+), 62 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index 4ba3d8250d..dc5951256c 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -1370,6 +1370,10 @@ acceptedBreaks:
new: "class org.apache.iceberg.encryption.EncryptingFileIO"
justification: "New method for Manifest List reading"
org.apache.iceberg:iceberg-core:
+ - code: "java.class.defaultSerializationChanged"
+ old: "class org.apache.iceberg.hadoop.SerializableConfiguration"
+ new: "class org.apache.iceberg.hadoop.SerializableConfiguration"
+ justification: "Serialization across versions is not guaranteed"
- code: "java.class.noLongerInheritsFromClass"
old: "class org.apache.iceberg.rest.auth.OAuth2Manager"
new: "class org.apache.iceberg.rest.auth.OAuth2Manager"
diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java
b/core/src/main/java/org/apache/iceberg/SerializableTable.java
index dce7697319..7e5746ec91 100644
--- a/core/src/main/java/org/apache/iceberg/SerializableTable.java
+++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java
@@ -22,14 +22,11 @@ import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.hadoop.HadoopConfigurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.SerializableMap;
-import org.apache.iceberg.util.SerializableSupplier;
/**
* A read-only serializable table that can be sent to other nodes in a cluster.
@@ -83,7 +80,7 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
Map<Integer, PartitionSpec> specs = table.specs();
specs.forEach((specId, spec) -> specAsJsonMap.put(specId,
PartitionSpecParser.toJson(spec)));
this.sortOrderAsJson = SortOrderParser.toJson(table.sortOrder());
- this.io = fileIO(table);
+ this.io = table.io();
this.encryption = table.encryption();
this.locationProviderTry = Try.of(table::locationProvider);
this.refs = SerializableMap.copyOf(table.refs());
@@ -124,14 +121,6 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
}
}
- private FileIO fileIO(Table table) {
- if (table.io() instanceof HadoopConfigurable) {
- ((HadoopConfigurable)
table.io()).serializeConfWith(SerializableConfSupplier::new);
- }
-
- return table.io();
- }
-
private Table lazyTable() {
if (lazyTable == null) {
synchronized (this) {
@@ -453,31 +442,4 @@ public class SerializableTable implements Table,
HasTableOperations, Serializabl
return type;
}
}
-
- // captures the current state of a Hadoop configuration in a serializable
manner
- private static class SerializableConfSupplier implements
SerializableSupplier<Configuration> {
-
- private final Map<String, String> confAsMap;
- private transient volatile Configuration conf = null;
-
- SerializableConfSupplier(Configuration conf) {
- this.confAsMap = Maps.newHashMapWithExpectedSize(conf.size());
- conf.forEach(entry -> confAsMap.put(entry.getKey(), entry.getValue()));
- }
-
- @Override
- public Configuration get() {
- if (conf == null) {
- synchronized (this) {
- if (conf == null) {
- Configuration newConf = new Configuration(false);
- confAsMap.forEach(newConf::set);
- this.conf = newConf;
- }
- }
- }
-
- return conf;
- }
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
index a4ac5e2ff6..877290f48e 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java
@@ -66,9 +66,14 @@ public class HadoopFileIO implements HadoopConfigurable,
DelegateFileIO {
public HadoopFileIO() {}
public HadoopFileIO(Configuration hadoopConf) {
- this(new SerializableConfiguration(hadoopConf)::get);
+ this(new SerializableConfiguration(hadoopConf));
}
+ /**
+ * @deprecated since 1.11.0, will be removed in 1.12.0; use {@link
+ * HadoopFileIO#HadoopFileIO(Configuration)} instead.
+ */
+ @Deprecated
public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) {
this.hadoopConf = hadoopConf;
}
@@ -115,7 +120,7 @@ public class HadoopFileIO implements HadoopConfigurable,
DelegateFileIO {
@Override
public void setConf(Configuration conf) {
- this.hadoopConf = new SerializableConfiguration(conf)::get;
+ this.hadoopConf = new SerializableConfiguration(conf);
}
@Override
@@ -125,7 +130,7 @@ public class HadoopFileIO implements HadoopConfigurable,
DelegateFileIO {
if (hadoopConf == null) {
synchronized (this) {
if (hadoopConf == null) {
- this.hadoopConf = new SerializableConfiguration(new
Configuration())::get;
+ this.hadoopConf = new SerializableConfiguration(new Configuration());
}
}
}
@@ -133,6 +138,10 @@ public class HadoopFileIO implements HadoopConfigurable,
DelegateFileIO {
return hadoopConf.get();
}
+ /**
+ * @deprecated since 1.11.0, will be removed in 1.12.0.
+ */
+ @Deprecated
@Override
public void serializeConfWith(
Function<Configuration, SerializableSupplier<Configuration>>
confSerializer) {
diff --git
a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
index 3e9f17455f..8c660bc29d 100644
---
a/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
+++
b/core/src/main/java/org/apache/iceberg/hadoop/SerializableConfiguration.java
@@ -18,33 +18,34 @@
*/
package org.apache.iceberg.hadoop;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.SerializableSupplier;
/** Wraps a {@link Configuration} object in a {@link Serializable} layer. */
-public class SerializableConfiguration implements Serializable {
-
- private transient Configuration hadoopConf;
+public class SerializableConfiguration implements
SerializableSupplier<Configuration> {
+ private final Map<String, String> confAsMap;
+ private transient volatile Configuration hadoopConf = null;
public SerializableConfiguration(Configuration hadoopConf) {
- this.hadoopConf = hadoopConf;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- hadoopConf.write(out);
- }
-
- private void readObject(ObjectInputStream in) throws ClassNotFoundException,
IOException {
- in.defaultReadObject();
- hadoopConf = new Configuration(false);
- hadoopConf.readFields(in);
+ this.confAsMap = Maps.newHashMapWithExpectedSize(hadoopConf.size());
+ hadoopConf.forEach(entry -> confAsMap.put(entry.getKey(),
entry.getValue()));
}
+ @Override
public Configuration get() {
+ if (hadoopConf == null) {
+ synchronized (this) {
+ if (hadoopConf == null) {
+ Configuration newConf = new Configuration(false);
+ confAsMap.forEach(newConf::set);
+ this.hadoopConf = newConf;
+ }
+ }
+ }
+
return hadoopConf;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
index a39b3413d5..c27c609d94 100644
--- a/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
+++ b/core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java
@@ -154,7 +154,7 @@ public class ResolvingFileIO
@Override
public void setConf(Configuration conf) {
- this.hadoopConf = new SerializableConfiguration(conf)::get;
+ this.hadoopConf = new SerializableConfiguration(conf);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
index 216f55eae3..c65251abcf 100644
--- a/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SerializationUtil.java
@@ -43,7 +43,7 @@ public class SerializationUtil {
* @return serialized bytes
*/
public static byte[] serializeToBytes(Object obj) {
- return serializeToBytes(obj, conf -> new
SerializableConfiguration(conf)::get);
+ return serializeToBytes(obj, SerializableConfiguration::new);
}
/**
diff --git
a/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
b/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
new file mode 100644
index 0000000000..8912b0135f
--- /dev/null
+++
b/core/src/test/java/org/apache/iceberg/hadoop/TestSerializableConfiguration.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.hadoop;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.TestHelpers;
+import org.junit.jupiter.api.Test;
+
+public class TestSerializableConfiguration {
+
+ @Test
+ public void kryoSerialization() throws IOException {
+ Configuration configuration = new Configuration();
+ configuration.set("prefix.key1", "value1");
+ configuration.set("prefix.key2", "value2");
+ SerializableConfiguration conf = new
SerializableConfiguration(configuration);
+ SerializableConfiguration serialized =
TestHelpers.KryoHelpers.roundTripSerialize(conf);
+
+ assertThat(serialized.get().getPropsWithPrefix("prefix"))
+ .isEqualTo(conf.get().getPropsWithPrefix("prefix"))
+ .isEqualTo(configuration.getPropsWithPrefix("prefix"));
+ }
+
+ @Test
+ public void javaSerialization() throws IOException, ClassNotFoundException {
+ Configuration configuration = new Configuration();
+ configuration.set("prefix.key1", "value1");
+ configuration.set("prefix.key2", "value2");
+ SerializableConfiguration conf = new
SerializableConfiguration(configuration);
+ SerializableConfiguration serialized =
TestHelpers.roundTripSerialize(conf);
+
+ assertThat(serialized.get().getPropsWithPrefix("prefix"))
+ .isEqualTo(conf.get().getPropsWithPrefix("prefix"))
+ .isEqualTo(configuration.getPropsWithPrefix("prefix"));
+ }
+}