jessiedanwang commented on issue #5414:
URL: https://github.com/apache/iceberg/issues/5414#issuecomment-1206777099
i have change it to java to work around the compilation error, but i am
still seeing the same kryo serialization issue on ImmutableMap
Caused by: java.lang.UnsupportedOperationException
at
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:714)
Here is the code change I have made, do i miss anything?
Add spark config
.config("spark.kryo.registrator", "xxx.CustomKryoRegistrator")
import com.esotericsoftware.kryo.Kryo;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.serializer.KryoRegistrator;
public class CustomKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo){
final ImmutableRelocatedMapSerializer serializer = new
ImmutableRelocatedMapSerializer();
kryo.register(java.util.HashMap.class);
kryo.register(ImmutableMap.class, serializer);
}
}
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
public class ImmutableRelocatedMapSerializer extends
Serializer<ImmutableMap> {
private static final boolean DOES_NOT_ACCEPT_NULL = true;
private static final boolean IMMUTABLE = true;
public ImmutableRelocatedMapSerializer() {
super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
}
@Override
public void write(Kryo kryo, Output output, ImmutableMap object) {
kryo.writeObject(output, Maps.newHashMap(object));
}
@Override
public ImmutableMap read(Kryo kryo, Input input, Class<ImmutableMap>
type) {
java.util.Map map = kryo.readObject(input, java.util.HashMap.class);
return ImmutableMap.copyOf(map);
}
}
I have also tried the following, did not work either
val sparkConf = new
SparkConf().setAppName(appName).registerKryoClasses(Array(classOf[org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap[Any,
Any]]))
spark = SparkSession
.builder()
.config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg
related configs
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1")
.config(s"spark.sql.catalog.iceberg_catalog",
"org.apache.iceberg.spark.SparkCatalog")
.config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath)
.config(s"spark.sql.catalog.iceberg_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog")
.config(s"spark.sql.catalog.iceberg_catalog.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.config(s"spark.sql.catalog.iceberg_catalog.client.factory",
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory")
.config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn",
s"arn:aws:iam::$catalogId:role/$role")
.config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region",
"us-east-2")
.config("spark.sql.iceberg.handle-timestamp-without-timezone", "true")
.config("spark.hadoop.hive.metastore.glue.catalogid", catalogId)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]